From charlesreid1

Serverless Data Analysis with BigQuery

Course 3, Part 1

(See GCDEC/Dataflow/Notes for Part 2)

Module 1: Data Analysis and Writing Queries

Data Engineers

Data engineers enable decision-making

They do this by:

  • Building data pipelines
  • Ingesting data
  • processing data
  • Building analysis tools
  • Building dashboards
  • Building machine-learning models

Enabling decision-making in a systematic way

Data engineers must know both programming and statistics in depth

Advantage of cloud services:

  • Amount of programming you need to know has gotten simpler
  • Statistics realm has also gotten simpler
  • These have enabled better analysis - programming with data, building statistical machine learning models, etc.
  • Enables end-to-end data engineers: building data pipelines, all the way through building statistical machine learning models

Serverless Data Pipelines

Building data pipelines using BigQuery and Cloud Dataflow

What you need:

  • Python or Java (for Dataflow)
  • SQL (for BigQuery)

BigQuery:

  • No-ops data warehouse solution - you ask it a question, you get an answer, and that's it (scales to petabytes)
  • Topics: queries, functions, load/export, nested repeated fields, windows, UDFs
  • Labs: queries and functions, loading and exporting data, demos

Cloud Dataflow:

  • No-ops data pipeline for scalable data processing - you ask it to process data in a certain way, and you get the result (scales to large streams)
  • Writing programs to process data (batch or streaming, code can work on both)
  • Topics: pipeline concepts, using MapReduce, side inputs, streaming
  • Labs: Simple pipelines, MapReduce, side inputs, demos

BigQuery

BigQuery:

  • How it works
  • Constructing queries and using functions
  • Loading data into BigQuery
  • Exporting data from BigQuery
  • Advanced capabilities (nesting, etc.)
  • Performance and pricing

Labs: very structured; recommended you try different tasks with different data sets

History:

  • GFS/MapReduce papers (2002-2004), Dataproc, BigTable
  • Part of Google Big Data Stack 1.0
  • Dremel/BigQuery (2006), Dataflow (2014), TensorFlow (2015)
  • Part of Google Big Data Stack 2.0

Big Data Stack 2.0 serverless approach is a result of problems with the MapReduce framework

MapReduce framework:

  • Parallelize your data storage (sharded datasets)
  • Parallelize the map operations, so each map operation works on whatever shards of data the compute node owns
  • These results are then recombined on a different set of machines in the reduce operation/step
  • The MapReduce framework requires a prior step: you have to take your data, and you have to shard it.
  • This presents a scaling problem, because now you are mixing up storage and compute
  • The number of machines you need drives how you spilt your data
  • Each time you do a compute job, you first need to figure out where the data are stored

The TLDR: The serverless approach scales your infrastructure to fit your data; the MapReduce approach scales your data to fit your infrastructure.

bigquery.cloud.google.com

Two examples of BigQuery queries:

SELECT
    nppes_provider_state AS state,
    ROUND(SUM(total_claim_count))/1e6) AS total_claim_count_millions
FROM
    `bigquery-public-data.medicare.part_d_prescriber_2014`
GROUP BY
    state
ORDER BY
    total_claim_count_millions DESC
LIMIT
    5;

This can easily be switched to listing claims by drug, instead of by state: just change the two state field names to drug.

SELECT
    drug,
    ROUND(SUM(total_claim_count))/1e6) AS total_claim_count_millions
FROM
    `bigquery-public-data.medicare.part_d_prescriber_2014`
GROUP BY
    drug
ORDER BY
    total_claim_count_millions DESC
LIMIT
    5;
--------------------------------------------

Features/components:

  • Interact with petabyte scale datasets
  • Uses SQL 2011 query language and SQL functions
  • Multiple methods for importing/exporting (incl. third party tools)
  • Nested and repeated fields (JSON logs) are ok - although this goes beyond the regular SQL language
  • User-defined functions in Javascript
  • Data storage inexpensive
  • Queries are charged based on amount of data processed (also fixed-cost plans)
  • Immutable audit logs
  • Can use access control to share data with others (without hassles of ops), so can enable data collaboration
  • Can also share queries, so e.g., "How many people are taking course X" - respond with a query that they can open anytime

BigQuery Project Architecture

Pattern for analytics architecture:

  • Mobile gaming - things are happening continuously in the game, generating a stream of events
  • Events are passed (through game server authentication) to PubSub for asynchronous messaging
  • Messaging is fed to Dataflow for parallel data processing
  • Dataflow outputs to BigQuery
  • BigQuery data can then be analyzed using Datalab, or BI tools like Tableau, or spreadsheets, etc.

There is also a batch component (e.g., historical data)

  • Batch load sent to cloud storage (historical data, raw log storage)
  • Storage and logs are sent to Dataflow for parallel processing (batch pipeline)
  • Dataflow outputs to BigQuery
  • etc.

Access control:

  • Project level - top level, where the billing occurs
  • Dataset level - organization and access control level
  • Tables - data with schema (not where access control occurs)
  • Access control happens at the dataset level, because you want to be able to join tables together

Tables:

  • Table views are virtual tables, defined by SQL queries
  • Tables can also be external (e.g., live on Cloud Storage)

Columns vs Rows:

  • Relational databases are row-based (record-oriented storage) to support updates to existing records
  • BigQuery storage is column-based (more compact/compressed, easier to replicate data)
  • BigQuery does not utilize keys
  • Designed for massive and immutable datasets

Consequences for cost: running a query on fewer columns means lower cost


Queries and Functions

Note: to get all of the data sets, open bigquery.cloud.google.com Click the triangle next to the project name Pick Switch to project > Display project Then enter the following:

  • bigquery-samples
  • publicdata
  • bigquery-public-data


Run queries from web console (bigquery.cloud.google.com)

When calling BigQuery query client program (Java/Python), need to specify project (not needed in web console - current project) when specifying where to select FROM (project.dataset.table)

Consider example query:

SELECT
    airline,
    SUM(IF(arrival_delay > 0), 1, 0)) AS num_delayed,
    COUNT(arrival_delay) AS total_flights

FROM
    `bigquery-samples.airline_ontime-data.flights`

WHERE
    arrival_airport='OKC'
    AND departure_airport='DFW'

GROUP BY
    airline


SQL features to point out:

  • Can rename variables "on the fly" and return them with more convenient handles
  • SELECT some-inconveniently-long-name AS result1
  • Can perform aggregate operations (or even conditional aggregate operations) on the fields being selected
  • SELECT airline, SUM(IF(arrival_delay > 0, 1, 0)) AS num_delayed
  • Can perform counts of non-null fields
  • SELECT COUNT(arrival_delay) AS total_flights
  • (This arrival_delay field would be null if a flight were canceled, otherwise it is a number)
  • Can perform counts of non-null, non-zero fields
  • SELECT COUNT(IF(arrival_delay > 0, 1, 0)) AS total_delayed_flights
  • Can set conditions using the WHERE clause
  • WHERE arrival_airport='OKC'
  • WHERE departure_airport='DFW'
  • Can also combine these with logical operators:
  • WHERE arrival_airport='OKC' AND departure_airport='DFW'
  • We can group records (think of this as tossing them all into bags)
  • GROUP BY airline, departure_airport
  • This creates a bag for each combination of airline and departure airport
  • Each bag contains many flights - we are performing aggregation operations on each bag (count, sum, etc.)


Performing subqueries

Subqueries use one query to feed another query

Using the query from before, we can fee those results into a new query - we are now performing a query on the TEMPORARY RESULTS TABLE

SELECT
    airline, 
    departure_airport, 
    num_delayed, 
    total_flights, 
    num_delayed/total_flights AS delayed_frac

FROM

    (
    SELECT
        airline, departure_airport,
        SUM(IF(arrival_delay > 0, 1, 0)) AS num_delayed,
        COUNT(arrival_delay) AS total_flights
    FROM
        `bigquery-samples.airline_ontime_data.flights`
    WHERE
        arrival_airport='OKC'
    GROUP BY
        airline, departure_airport
    )

WHERE
    total_flights > 5

ORDER BY delayed_Frac DESC 

LIMIT 5

The first query returns a table with four columns: airline, departure_airport, num_delayed, total_flights

From those results, we perform a second query that further filters results (and performs a calculation)

This tells you the airline OO is delayed, leaving Atlanta, 72% of the time (most delayed departure location + airline combination)


Querying from multiple tables

To query from multiple tables, we specify multiple tables in the FROM clause

This query examines non-internal root actions over the course of three days:

SELECT
    FORMAT_UTC_USEC(event.timestamp_in_usec) AS time,
    request_url

FROM
    [myproject.applogs.events_20120501],
    [myproject.applogs.events_20120502],
    [myproject.applogs.events_20120503]

WHERE
    event.username='root'
    AND NOT event.source_ip.is_internal

If we want to match any tables within a date range, can use a "wildcard" FROM:

FROM
    TABLE_DATE_RANGE(myproject:applogs.events_,
                    TIMESTAMP('2012-05-01'),
                    TIMESTAMP('2012-05-03'))

This makes it easier to select larger date ranges without the complications of wildcards not selecting the right set of tables (May-July, for example)

(Not entirely clear how TABLE_DATE_RANGE or TIMESTAMP are working together, but ok, that's an SQL issue.


Joining tables

Nice explanation of inner vs outer joins: http://www.programmerinterview.com/index.php/database-sql/inner-vs-outer-joins/

Join predicate - the setting that tells the join what fields it is actually joining

Inner joins only pick results in common to both

Outer joins fill in records missing from one or the other with NULL

Left outer join leaves in all records that are in the left table, filling in NULL for fields missing in the right table. Any records that exist in the right table but not in the left table are dropped.

Right outer join leaves in all records that are in the right table, filling in NULL for fields missing in the left table. Any records that exist in the left table but not the right table are dropped.

Full outer join will include every record in both left and right, joining records that can be joined, and filling in NULL for missing fields of records that only exist in left or right tables. All records are kept/included in the join.

To join tables, use the JOIN ON statement (the join predicate comes after the ON)

(Note: this uses the fact that we can also create "handles" for tables, like this:)

FROM `bigquery-samples.airline_ontime_data.flights` AS f

Here's the example, which we'll break down:

SELECT
    f.airline,
    SUM(IF(f.arrival_delay) > 0, 1, 0)) AS num_delayed,
    COUNT(f.arrival_delay) AS total_flights

FROM
    `bigquery-samples.airline_ontime_data.flights` as f

JOIN (
    SELECT
        CONCAT(
            CAST(year AS STRING),'-',
            LPAD(CAST(month AS STRING),2,'0'),'-',
            LPAD(CAST(day AS STRING),2,'0')
        ) AS rainyday
    FROM
        `bigquery-samples.weather-geo.gsod`
    WHERE
        station_number = 725030
        AND total_precipitation > 0
    ) as w

ON
    w.rainyday = f.date

WHERE
    f.arrival_airport = 'LGA'

GROUP BY
    f.airline

Start with the subquery:

    SELECT
        CONCAT(
            CAST(year AS STRING),'-',
            LPAD(CAST(month AS STRING),2,'0'),'-',
            LPAD(CAST(day AS STRING),2,'0')
        ) AS rainyday
    FROM
        `bigquery-samples.weather-geo.gsod`
    WHERE
        station_number = 725030
        AND total_precipitation > 0
    ) as w

This is selecting dates where there was precipitation at a particular weather station, and combining separate year/month/day fields into a sensible string that is comparable to the strings in the airline data set. (Left pad means, pad this number into a 2-digit number, using 0s to pad, so 3 is turned into 03)

We are then using a field that we haven't used yet - the date field of the airline delays database - to FILTER the airline delays we are looking at

So the logic here is:

  • Construct a subquery that selects rainy days from the weather data set
  • Do an inner join on the rainy days dates and the airline delays dates, tossing out the rest of the weather data set (this will also discard any flight delay info that occurs on non-rainy days, and any rainy days that we don't have flight delays for)
  • Perform the same aggregation as before, counting number of delays and number of total flights

NOTE: if we want to first explore the sub-query, can run this sub-query; important to add LIMIT 5!

SELECT
    year, month, day, precipitation

FROM 
    `bigquery-samples.weather-geo.gsod`

WHERE
    station_number = 725030
    AND total_precipitation > 0

LIMIT 5


Building Queries: Lab

https://codelabs.developers.google.com/codelabs/cpb101-bigquery-query/#0

Goal of lab:

  • Run some queries of data
  • Modify queries to explore different features (functions, booleans, string processing, subqueries, etc.)


Running a basic query

SELECT
  airline,
  date,
  departure_delay
FROM
  `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0
  AND departure_airport = 'LGA'
LIMIT
  100


Aggregate and boolean functions

SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

Change to:

SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0 AND
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

Now run:

SELECT
  f.airline,
  COUNT(f.departure_delay) AS total_flights,
  SUM(IF(f.departure_delay > 0, 1, 0)) AS num_delayed
FROM
   `bigquery-samples.airline_ontime_data.flights` AS f
WHERE
  f.departure_airport = 'LGA' AND f.date = '2008-05-13'
GROUP BY
  f.airline


String operations

Run the query:

SELECT
  CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
FROM
  `bigquery-samples.weather_geo.gsod`
WHERE
  station_number = 725030
  AND total_precipitation > 0

Joining queries on field

Join sql statement:

SELECT
  f.airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline


Using subqueries

Example query that uses a subquery: runs one query, feeds the results on to another query

SELECT
  airline,
  num_delayed,
  total_flights,
  num_delayed / total_flights AS frac_delayed
FROM (
SELECT
  f.airline AS airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
  )
ORDER BY
  frac_delayed ASC

Make Your Own BigQuery

Baseball data set: https://cloud.google.com/bigquery/public-data/baseball

games_wide: Every pitch, steal, or lineup event for each at bat in the 2016 regular season. games_post_wide: Every pitch, steal, or lineup event for each at-bat in the 2016 post season.

(Same schema for both)


Example queries:

"What types of pitches were thrown, how many, top speeds?"

"What was the at bat and pitching sequence for the fastest pitch(es) in the 2016 season?"


Github repo: https://github.com/charlesreid1/sabermetrics-bigquery

Module 1B: Advanced Queries

Loading and Exporting from BigQuery

So far, have covered what to do with data once it is in BigQuery. But important to talk about loading data too.

BigQuery is a primary endpoint for data being ingested/captured and processed

Ingestion/capture can happen through:

  • App Engine
  • Cloud Loging
  • Google Analytics
  • PubSub
  • Cloud Monitoring

Processing happens via Dataflow (batch/stream) or Dataproc

BigQuery then helps with the storage and analysis steps:

  • Store results of computations in BigQuery (or Cloud Storage)
  • Data warehouse (SQL for analytics)
  • Visualize data stored in BigQuery using Datalab/third party tools

Loading data:

  • Command line interface - bq tool
  • Web UI - drag-and-drop data
  • API - from Python/Java/etc, or another tool like Dataflow/Datalab

Next lab will cover loading/exporting data to/from BigQuery using bq tool, web UI


Loading BigQuery Data Using Web UI

Airports.csv: https://storage.googleapis.com/cloud-training/CPB200/BQ/lab4/airports.csv

Link: https://codelabs.developers.google.com/codelabs/cpb101-bigquery-data/#0

Local file: airport.csv

  • First, need to create a dataset
  • Then, create a table in the dataset

Create new dataset (must be unique to the project)

Click the plus button next to it to add a table

Pick upload file, and select the file locally on disk

Name the destination table

For the data types of each field:

  • Can enter information manually
  • Can also check box to have BigQuery try and guess

If entering manually, use drop-downs to see what types are available. Then click "Edit as text" and enter:

IATA:STRING,
AIRPORT:STRING,
CITY:STRING,
STATE:STRING,
COUNTRY:STRING,
LATITUDE:FLOAT,
LONGITUDE:FLOAT

Because the file has a header, we want to skip 1 header row

Helpfully, we are informed that the schema was auto-detected, and if something went wrong we can just re-run the load job, adjusting the parameters of the job to make it work.

Now, preview the table to explore it. Works great.

Try a query: look for airports located above/west of a certain lat/long (just returns AK airports...)

SELECT
  AIRPORT,
  CITY,
  STATE
FROM
  `not-all-broken.demo_flights.airports`
WHERE
  LATITUDE > 60
  AND LONGITUDE < -120


Loading BigQuery Data Using CLI

Starting with a data set that is provided:

{"YEAR":"2014","QUARTER":"3","MONTH":"9","DAY_OF_MONTH":"1","DAY_OF_WEEK":"1","FULL_DATE":"2014-09-01","CARRIER":"AA","TAIL_NUMBER":"N794AA","FLIGHT_NUMBER":"1","ORIGIN":"JFK","DESTINATION":"LAX","SCHEDULED_DEPART_TIME":"900","ACTUAL_DEPART_TIME":"851","DEPARTURE_DELAY":"-9","TAKE_OFF_TIME":"910","LANDING_TIME":"1135","SCHEDULED_ARRIVAL_TIME":"1210","ACTUAL_ARRIVAL_TIME":"1144","ARRIVAL_DELAY":"-26","FLIGHT_CANCELLED":"0","CANCELLATION_CODE":"","SCHEDULED_ELAPSED_TIME":"370","ACTUAL_ELAPSED_TIME":"353","AIR_TIME":"325","DISTANCE":"2475"}

The schema is also provided in a JSON file:

[
    {
        "name": "YEAR",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "QUARTER",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "MONTH",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },

Now, these are uploaded from the command line via the command:

$ bq load --source_format=NEWLINE_DELIMITED_JSON \
    $DEVSHELL_PROJECT_ID:cpb101_flight_data.flights_2014 \
    gs://cloud-training/CPB200/BQ/lab4/domestic_2014_flights_*.json \
    ./schema_flight_performance.json

(First argument specifies the source data format) (Second argument specifies the name of the BigQuery table to create) (Third argument specifies the actual data files) (Fourth argument specifies location of schema)


NOTE: realized that the problem was, it was trying to actually parse the header row. Was not telling it to skip the header row.

There is apparently no command line option to skip N header rows (as there is in the web API). You have to manually modify your data file.

Make a dataset called kaggle:

$ bq mk kaggle

Strip out header row

$ # tail, starting on second line
$ tail -n +2 creditcard.csv > temp.csv && mv temp.csv creditcard.csv

Load the data into the table (adding --replace because I tested this command out with 10 lines of data first):

$ bq load --replace not-all-broken:kaggle.creditcardfraud creditcard.csv schema_cc.json

Here is what schema_cc.json looked like:

[
    {
        "name": "Time",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "V1",
        "type": "FLOAT",
        "mode": "REQUIRED"
    },
...

Choked on this line, due to the 1e+ notation:

1e+05,-1.40863322829137,-1.624697936324,2.54774230369692,0.385671523516146,0.502790072699087,0.507194721385658,-1.74443114473473,0.760594225747498,3.00170400322912,-1.16309493493591,0.393516209370638,-1.9761080196344,0.231283936059375,0.526783847087809,-3.23246747614055,0.021830807795285,0.519902436567202,1.12642627868296,0.0800098779433671,0.209032736561698,0.21764082393781,0.758246770655715,0.281254134069628,0.736607975853892,-0.741402076374565,0.255349902866452,0.141944167181525,0.228167205092217,49.5,"0"

Had to edit by hand. Probably better to use --max_bad_records instead

bq command line tool documentation: https://cloud.google.com/bigquery/bq-command-line-tool


Loading BigQuery Data From GCS Using CLI

If data is on GCS, we can just point to the GCS location, instead of the local location - everything else should stay pretty much the same.

When copying data into GCS, use the -Z flag to compress the data first:

$ gsutil cp -Z creditcard.csv gs://stupid-bukkit/datasets/

Then use "bq load" command

Link: https://cloud.google.com/bigquery/quickstart-command-line

Advanced Queries Using WITH

Link to BigQuery SQL reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/

Types in standard SQL:

  • String
  • Int64
  • Float64
  • Bool
  • Array
  • Struct
  • Timestamp

Structs are analogous to classes - bundles of data fields that are named and can be accessed

Let's examine a more advanced query using the WITH statement:

WITH WashingtonStations AS 
    (
        SELECT 
            weather.stn AS station_id,
            ANY_VALUE(station.name) AS name
        FROM
            `bigquery-public-data.noaa_gsod.stations` AS station
        INNER JOIN
            `bigquery-public-data.noaa_gsod.gsod2015` AS weather
        ON
            station.usaf = weather.stn
        WHERE
            station.state = 'WA 
            AND 
            station.usaf != '999999'
        GROUP BY
            station_id
    )
SELECT
    washington_stations.name,
    (
        SELECT 
            COUNT(*)
        FROM
            `bigquery-public-data.noaa_gsod.gsod_2015` AS weather
        WHERE
            washington_stations.station_id = weather.stn
            AND
            prcp > 0 AND prcp < 99
    )
    AS rainy_days
FROM 
    WashingtonStations AS washington_stations
ORDER BY
    rainy_days DESC

Walking through this Texas Hairball of an SQL query:

  • The with clause defines a subquery that joins noaa_gsod.stations and noaa_gsod.gsod2015 (weather)
  • This selects only DISTINCT rows
  • This means that we've now populated WashingtonStations with a bunch of rows - WA state weather stations that are run by the air force
  • We then run the actual SELECT query, but actually we're running the query on the results of a subquery
  • The subquery gets a count of the number of WA weather stations (from the with clause result) that had rainy days
  • (Here I am getting a little bit lost... are we counting the number of rainy days, or just filtering on stations that experienced rain? Looks like we're counting rainy days.)
  • We then perform a query from that result, to order the weather stations by number of rainy days


Breaking it down further:

  • Start with query in the WITH that is joining two tables.
  • Join predicate is stations.usaf (stations rn by USAF) and weather.stn - all weather stations run by USAF
  • Returns station IDs and station names that are run by USAF
  • Taking this result set, we use WITH WashingtonStations - so we are calling all of the results WashingtonStations

Now on to the SELECT statement:

  • We are selecting the name of each of those WA weather stations we found from prior query
  • We are ALSO running a sub-query to get weather data from the weather table (gsod205), using the station IDs from the query results above
  • The sub-query is selecting all of the days in the weather table where there was precipitation, and summarizing the results as rainy_days


Advanced Queries: ARRAY/STRUCT

Link to BigQuery SQL reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/

The following advanced query will show how to use arrays and structs in SQL when using BigQuery.

WITH TitlesAndScores AS
(
    SELECT
        ARRAY_AGG( STRUCT(title,score) ) AS titles,
        EXTRACT(DATE FROM time_ts) AS date
    
    FROM
        `bigquery-public-data.hacker_news.storeis`

    WHERE
        score IS NOT NULL
        AND
        title IS NOT NULL
    GROUP BY
        date
)
SELECT 
    date,
    ARRAY( 
        SELECT AS STRUCT title, score
        FROM UNNEST(titles) 
        ORDER BY score DESC
        LIMIT 2
    )
AS top_articles
FROM TitlesAndScores

Again, we'll break down this hairball of a query into its components.

Start with the WITH clause:

  • Makes title and score columns into an object
  • Makes an array filled with these two-column objects, and calls the array "titles"
  • Extracts the data from the timestamp of each article (standard timestamp format, strip out the time information)
  • Groups by the data
  • The result is a series of arrays: [<title>,<score>]
  • Each [title-score] array is put into a bag labeled with the corresponding date

Note that if we weren't aggregating titles and scores into an array, we would have a very large number of titles and scores for EACH DAY, and so we would get an error that we would need to aggregate the results somehow. If we limited the records we were looking at to just the top items, e.g., replacing ARRAY_AGG(title, score) with MAX(title), MAX(score), this works, but is senseless - titles and scores have been severed, and we get unrelated scores and titles. So the ARRAY_AGG ensures that titles and scores are tied together - a particular title has a particular score. (Finding the title WITH the highest score.) The first step to do this is just to get all of the titles AND THEIR CORRESPONDING scores and pack them up into an array.

Now, examine the sub-query that follows the SELECT statement:

  • We are creating an ARRAY from a sub-query
  • The sub-query is unnesting the array that resulted from the WITH statement
  • It orders each of these by their score (top scoring articles listed first)
  • It then selects the top 2 only
  • The unpacked information is then packed back up into an array (results of query are passed into ARRAY() function)
  • The information that is packed up is the title and score

Outer SELECT query:

  • Gets the list of dates from the sub-query in the WITH clause
  • Recall, those results were grouped into bags, each bag labeled with a date
  • Also gets the ARRAY results from the prior sub-query (title and score of top 2 articles)

Now, we have a series of bags labeled with the date, but in each bag we have only 2 arrays containing [<title>, <score>]


More Complex JOIN Conditions

Link to BigQuery SQL reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/

Link to BigQuery SQL functions and operators: https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators

Link to BigQuery SQL subqueries: https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#subqueries


In previous WITH example (Washington State weather stations), we were doing a JOIN based on equality

JOIN not just limited to equality - can also JOIN using *any* boolean condition (including functions that return a boolean)

SQL functions that return booleans:

  • CAST(x AS BOOLEAN)
  • LOGICAL_AND
  • LOGICAL_OR
  • STARTS_WITH
  • ENDS_WITH
  • REGEXP_CONTAINS

Example query: subquery a table of names, and find which ones occur most frequently in Shakespeare

WITH TopNames AS 
(
    SELECT 
        name, 
        SUM(number) AS occurrences
    FROM
        `bigquery-public-data.usa_names.usa_1910_2013`
    GROUP BY
        name
    ORDER BY
        ocurrences DESC LIMIT 100
)
SELECT
    name,
    SUM(word_count) AS frequency
FROM
    TopNames
JOIN
    `bigquery-public-data.samples.shakespeare`
ON
    STARTS_WITH(word,name)
GROUP BY
    name
ORDER BY
    frequency DESC
LIMIT 10

(Question: where is word_count field defined? Is this part of the Shakespeare data set? YES. word_count is a field in the Shakespeare data set.)

The WITH subquery:

  • Finding 100 most common baby names - those are called TopNames
  • From TopNames, we will select each name and join that with the results of the Shakespeare table subquery

SELECT (outer) query:

  • We are querying to create a result of each word and its total word count over all of Shakespeare's works - that's the SELECT word, SUM(word_count)
  • Selecting this from the JOIN of name data (TopNames) and Shakespeare data
  • The JOIN condition is that the word in the Shakespeare corpus starts with the same prefix as the top names result
  • So, "Will" in TopNames would match "William" in Shakespeare
  • However, "William" in TopNames would not match "will" in Shakespeare
  • Looking for each word in Shakespeare that start with a particular name, and grouping them into bags, labeled with each name
  • Then we limit that to the top 10


Regular Expressions

SELECT
    word, 
    COUNT(word) as count
FROM
    `publicdata.samples.shakespeare`
WHERE (
    REGEXP_CONTAINS(word, r'^\w\w\'\w\w')
)
GROUP BY
    word
ORDER BY
    count DESC
LIMIT 3

This looks for the start of a word, ^, then two letters, \w\w, then an apostrophe, \', then two more letters

  • Example: "We'll"


Window Functions

if we are looking at a particular row, we can use window functions to say, "Show me the next row" or "Show me the row that is N rows ahead of/behind this row"

Navigation functions:

  • LEAD() - return value of row N rows ahead of current row
  • LAG() - return value of row N rows behind current row
  • NTH_VALUE() - returns the value of the Nth value in the window (example: median, 10th percentile, etc.)

By default, no ordering, things are returned shuffled. But once you add an ORDER BY, you have defined an order, and can start to utilize these window functions.

Analytical functions, ranking functions, etc, will operate on a *window*

Standard aggregations operate on a window:

  • SUM
  • AVG
  • MIN
  • MAX
  • COUNT

Other ranking functions (also operate on a number of things in a window):

  • CUME_DIST() - return cumulative dist. of a value in a group
  • DENSE_RANK() - return integer rank of value in group
  • ROW_NUMBER() - return current row number of query result
  • RANK() - return integer rank of value in a group
  • PERCENT_RANK() - returns rank of current row, relative to other rows in partition

When we define a function that operates over a window, we have to answer the question, "over what"? The OVER clause specifies the window frame.

Example query:

SELECT
    corpus,
    word,
    word_count,
    RANK() OVER (
        PARTITION BY
            corpus
        ORDER BY
            word_count DESC
    ) rank,
FROM
    `publicdata.samples.shakespeare`
WEHRE
    length(word) > 10 
    AND
    word_count > 10
LIMIT 40
  • Select the corpus - name of the play
  • Select the word
  • Select the word count
  • Then, rank the word within the corpus, descending rank
  • Find the rank within that corpus

So, we are ranking the most common words (rank according to word count) for each corpus

Date and Time Functions

BigQuery uses epoch time, returns values based on UTC time zone (default)

Many functions for manipulation of date and time stamps:

  • TIMESTAMP data type
  • DATE(yr, mo, day) - can construct a DATE from numerical values
  • DATE(timestamp) - can extract the date from a timestamp (supports timezones)
  • DATETIME(date, time) - convert DATE and TIME objects into single DATETIME object

User Defined Functions

Standard UDFs are scalar

Useful for things like non-trivial string parsing

Define the function before you start with the query, then call the function on the fields being extracted

SELECT val, addFourAndDivide(val, 2)

Here's the full query with function definition:

CREATE TEMPORARY FUNCTION
    addFourAndDivide( x INT64, y INT64) AS ((x+4)/y);

WITH numbers AS
    (
        SELECT 1 as val
        UNION ALL
        SELECT 3 as val
        UNION ALL
        SELECT 4 as val
        UNION ALL
        SELECT 5 as val
    )
SELECT
    val, 
    addFourAndDivide(val,2) AS result
FROM 
    numbers;


Can go further, and define temporary functions in another language (Javascript)

Example:

CREATE TEMPORARY FUNCTION 
    multiplyInputs(x FLOAT64, y FLOAT64)
RETURNS
    FLOAT64
LANGUAGE
    js AS """
    return x*y;
""";

WITH numbers AS (
        SELECT 1 AS x, 5 AS y
        UNION ALL
        SELECT 2 AS x, 10 AS y
        UNION ALL
        SELECT 3 AS x, 15 AS y
    )
SELECT 
    x, y, multiplyInputs(x,y) AS product
FROM
    numbers

Components:

  • CREATE TEMP FUNCTION
  • RETURNS [data type]
  • LANGUAGE [language]
  • AS [external code]

Constraints?

  • UDFs must return less than 5 MB
  • Only 6 concurrent UDFs can be run at a time
  • Native Javascript functions not supported
  • Javascript is only 32 bits, so it only handles the most significant 32 bits
  • Query jobs can only define maximum 50 Javascript UDF resources
  • Each inline blob limited to 32 KB
  • Each external code resource limited to 1 MB

Lab: write a query and run it from Java or Python.

Query will examine what languages are used on Github checkins on weekends


Performance

Less work means faster (and cheaper) query

What is work?

  • I/O - number of bytes red
  • Shuffle - how many bytes passed to next stage
  • Materialization - holding data/bytes in memory, versus writing them out
  • CPU overhead associated with functions being called (SUM/COUNT are cheap, SIN/COS are more expensive)

To improve performance:

Columns:

  • BigQuery is a column-based database, so limit query only to the columns you need
  • More columns = more network, more IO, more materialization
  • Avoid SELECT *

Filter early and often:

  • Use WHERE clauses (as early as possible) to filter data and reduce the amount of data passed between stages of the query
  • The more rows are processed, the more data is processed, the slower the query

Joins:

  • When doing a JOIN, you want to do the biggest joins first, and smaller joins later (this reduces the amount of data traveling through your query)

Low cardinality GROUP BYs

  • When you are grouping, think about how much data you are grouping
  • Low cardinality keys and groups are grouped faster (i.e., fewer bags)
  • High cardinality groups are slower (i.e., more bags)
  • You should get a count of how many groups you have, and about how many rows there are per key, in order to best understand the performance of your query

Example: how many commits per language

  • Javascript - high cardinality key
  • Haskell - low cardinality key
  • WHERE clause can help you avoid processing keys
  • HAVING num_commits > 100
  • This will reduce tail latency (lots and lots of small groups)

Functions:

  • Built-in functions are faster than UDFs, so try to use SQL functions
  • SQL UDFs are slightly faster than Javascript UDFs
  • BigQuery also supports approximate functions
  • Example: APPROX_COUNT_DISTINCT is much faster than COUNT(DISTINCT)
  • Approximate means, within 1% of the actual number

Ordering:

  • Only apply an ORDER on the outermost query
  • Usually makes no sense to ORDER an inner query
  • Filter first, then order
  • Otherwise, you're sorting things that you don't need to sort

Wildcard tables:

  • Querying multiple tables using wildcards
  • Number of tables affects performance
  • Example: "FROM `bigquery-public-data.noa_gsod.gsod*`

Table partitioning:

  • Can partition a table by timestamp
  • Specify the column that contains timestamp
  • Partition, behind the scenes, on a particular timestamp
  • Gives you same performance as a sharded table (reducing query to a limited number of shards)

Example:

SELECT FROM `sales`
WHERE _PARTITIONTIME
BETWEEN TIMESTAMP("20160101")
      AND TIMESTAMP("20160101")

While this works essentially the same way as a partitioned table, keep in mind the partitions are NOT as obvious to outsiders

Partitioning is much cleaner (one table), but imposes a cognitive load on everyone writing queries to remember to partition on a column


Optimize your queries, but also measure and monitor

  • Per-query explain plans: what did the query do?
  • Monitoring with Stackdriver: what's happening with resources?

BigQuery explanations:

  • Breakdown, stage by stage, of timing, input/output rows, etc.
  • Wait/Read/Compute/Write breakdowns

Identifying skew:

  • The other portion of the BigQuery explanation shows the ratio of maximum to average times for Wait/Read/Write/Compute
  • Skew means, maximum is much larger than average
  • This tells you some things are taking longer than average to process

Plans and categories:

  • Look for stages where you have significant skew, and improve the tail skew (lots of small groups)
  • Add conditions (WHERE x) to filter out more records
  • Use APPROX_TOP_COUNT to check
  • Filter early
  • If most time spent on CPU tasks, use approximate functions, examine UDF usage, filter earlier to reduce number of rows being operated on

BigQuery stack driver:

  • Can show different metrics in a dashboard
  • Slot utilization, queries in flight, uploaded bytes, stored bytes, etc.

Pricing

What's free?

  • Loading
  • Exporting
  • Queries on metadata (number of rows, names of columns, etc.)
  • Cached queries (per user)
  • Queries with errors

What's not free?

  • Storage
  • Compute

Storage costs:

  • Amount of data stored
  • Rate of ingestion of data
  • Older data gets a discount

Processing costs:

  • On-demand plans depend on amount of data queried/processed
  • Flat rate plans do not
  • 1 TB/month free
  • Can run high-compute queries, but have to opt in, special charge

Pricing info: http://cloud.google.com/bigquery/pricing

Client libraries: http://cloud.google.com/bigquery/client-libraries

Resources

Google

BigQuery basics - code lab: https://codelabs.developers.google.com/codelabs/cpb101-bigquery-query/#0

BigQuery data - code lab: https://codelabs.developers.google.com/codelabs/cpb101-bigquery-data/#0

Airports data (for code lab): https://storage.googleapis.com/cloud-training/CPB200/BQ/lab4/airports.csv

Link to bq command line tool documentation: https://cloud.google.com/bigquery/bq-command-line-tool

Link to BigQuery SQL reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/

Pricing info: http://cloud.google.com/bigquery/pricing

Client libraries: http://cloud.google.com/bigquery/client-libraries

Other

Nice explanation of inner vs outer joins: http://www.programmerinterview.com/index.php/database-sql/inner-vs-outer-joins/

charlesreid1 github repo for BigQuery practice: https://github.com/charlesreid1/sabermetrics-bigquery

Baseball data set: https://cloud.google.com/bigquery/public-data/baseball

Flags