From charlesreid1

Serverless Data Analysis with Dataflow

Module 2: Data Processing Pipelines with Dataflow

What Is Dataflow

Dataflow:

  • Way to execute data processing pipelines on the cloud
  • Flexible sources/sinks (e.g., read from BigQuery and write to Cloud Storage)
  • Steps - transforms - are elastic, can be scaled to more machines as needed
  • Code is written using open source API (Apache Beam)
  • Cloud Dataflow is the Apache Beam "pipeline service"
  • Other Apache Beam pipeline services: Flink, Spark
  • Example: read from GCS, perform filtering, perform grouping, perform transform, then write results to GCS

Each step: user-defined code (Java or Python classes)

ParDo - can run a particular transform in the context of a parallel do

Why Dataflow?

  • Batch or streaming
  • Cloud Storage - batch data (e.g., historical data) source
  • Cloud PubSub - streaming source
  • Can use the SAME PIPELINE for both scenarios
  • Can have Dataflow write to various sinks
  • BigQuery - batch results storage sink
  • Cloud Storage - batch results storage sink
  • PubSub - streaming results sink

For streaming cases:

  • Define a sliding window for streaming data
  • Change input and output to read from an UNBOUNDED source
  • Then define a window, e.g., 60 minutes

Data Pipelines

Can write pipelines in Java or Python

Concepts:

  • Pipeline - set of steps (transforms)
  • The pipeline is executed on the cloud by a runner
  • Apache Beam code forms the pipeline, Dataflow is the runner
  • Each step is elastically scaled
  • Source - where the input data comes from
  • Sink - where the transformed data goes

Pcollection:

  • Each transform on the pipeline takes a parallel collection (Pcollection) as an input
  • Pcollection - a list or map of items that does not need to be bounded by the size of the machine, does not need to fit into memory

Pipeline:

  • Directed graph of steps
  • Read in data, transform it, write data out
  • Example Java pipeline:
import org.apache.beam.sdk.Pipeline;

public static void main(String[] args) {
    // Create pipeline 
    // Parameterize with input args
    Pipeline p = Pipeline.create(PipelineOptionsFactoryfromArgs(arg));

    p.apply(TextIO.Read.from("gs://..."))   // Read the input
     .apply(new CountWords())               // Count (process) the text
     .apply(TextIO.Write.to("gs://..."));   // Write output to GCS

    // Now run the pipeline
    p.run();
}

p.run() executes the pipeline "graph" on the runner that will execute the pipeline

Direct runner - runs the pipeline on a single instance of the local machine

Dataflow runner - graph gets launched on the cloud

Python API: similar feel...

import apache_beam as beam

if __name__=="__main__":
    # Create pipeline 
    # Parameterize on input args
    p = beam.Pipeline(argv = sys.argv)

    (p
        | beam.io.ReadFromText("gs://...")              # Read input
        | beam.FlatMap(labda line: count_words(line))   # Process
        | beam.io.WriteToText("gs://...")               # Write output
    )

    p.run()     # Run the pipeline

Python uses the pipe operator to carry out transforms in sequence.

Step 1: create graph

Step 2: run it

Pcollections

Input to transform: Pcollection

Output from transform: Pcollection

All data in pipeline is represented with a Pcollection

# Java:
PCollection<String> lines = p.apply(...)

We can also define a transform to happen within a ParDo context, which will parallelize the transform, by defining a DoFn

Above, we define a collection of Strings called lines.

Below, we perform a transform for each line (each String in the collection called lines)

PCollection<Integer> sizes = 
    lines.apply("Length",
                parDo.of(new DoFn<String, Integer>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception{
                        String line = c.element();
                        c.output(line.length());
                    }
                }
            ));

Above - anonymous function that inherits from DoFn, defines processElement() method, result is a collection of integers that you can then transform with the next step

In Python:

lines = p | ...

Now, for every line that comes in, return the length of the line:

sizes = lines | "Length" >> beam.Map( lambda line : len(line))

This name is important - shows up in the monitoring console

Dataflow allows you to replace parts in a pipeline, WITHOUT ANY LOSS OF DATA (any data not processed by old pipeline will be processed by new pipeline)

But for that exchange of transforms to work, they need to have unique names

In Python, overloading >> so that "Length" >> beam.Map(...) means "call this map Length and have it perform a Map operation"

Executing Pipelines

To process data with pipelines, need to read data into pipelines

Can read input data from GCS, BigQuery, PubSub

Will look at a few example pipelines, first in Java, then in Python.

Java Pipelines

Example of reading text into a String:

// Note the wildcard syntax
PCollection<String> lines = p.apply(TextIO.Read.from("gs://.../input-*.csv.gz")

Example of reading data from PubSub:

PCollection<String> lines = p.apply(PubsubIO.Read.from("input_topic"));

Example of running a BigQuery query and returning a table row:

String javaQuery = "SELECT x, y, z FROM [project:dataset.tablename]";
PCollection<TableRow> javaContent = p.apply(BigQueryIO.Read.fromQuery(javaQuery));

The last example returns a PCollection of TableRow objects

Likewise with sinks - whatever you can read, you can also write

Write data to file system, GCS, BigQuery, or PubSub

lines.apply(TextIO.Write.to("/data/output").withSuffix(".txt")

If the output files are very small, and you don't want to deal with the hassle of sharding, can also say:

.apply(TextIO.Write.to("/data/output").withSuffix(".csv").withoutSharding()

(Note that this requires all I/O to happen on a single machine)

This may require transformation of relevant data from (whatever type) to String before writing out using TextIO (which only accepts Strings)

To execute your pipeline, two options:

  • Option 1: run Java, pass it the classpath, give it the name of the file/class with the main method
  • Option 2: run using maven

Option 2 looks like:

mvn compile -e exec:java -Dexec.mainClass=${MAIN}

And to run in the cloud, use mvn to submit the job to Dataflow

mvn compile -e exec:java \
-Dexec.mainClass=$MAIN \
-Dexec.args="--project=$PROJECT \
--stagingLocation=gs://$BUCKET/staging/ \
--tempLocation=gs://$BUCKET/staging/ \
--runner=DataflowRunner"

If using Java version of Dataflow, use Maven

  • Default is to use a local runner
  • Can add arguments to specify project (b/c this controls billing), staging location (where to stage code), temporary location (optional), and runner to use

Python Pipelines

Python pipeline execution:

Running locally just requires running the program without args:

python ./my-grep.py

To run in the cloud, specify parameters:

python ./my-grep.py \
--project=$PROJECT \
--job_name=myjob \
--staging_location=gs://$BUCEKT/staging/ \
--temp_location=gs://$BUCKET/staging \
--runner=DataflowRunner

Only difference is that Python requires a job name...

So far, we looked at what Dataflow is and some simple Dataflow concepts

Now we will write/implement a Dataflow pipeline

Data Pipelines Lab

Serverless Data Pipeline in Java

Start by running the command to generate a local Maven prototype project for a Dataflow pipeline:

$ mvn archetype:generate \
  -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-starter \
  -DarchetypeGroupId=com.google.cloud.dataflow \
  -DgroupId=com.example.pipelinesrus.newidea \
  -DartifactId=newidea \
  -Dversion="[1.0.0,2.0.0]" \
  -DinteractiveMode=false

This creates a new Maven project structure, with a ready-to-go barebones class that contains the basic structure of a Pipeline. (This comes from the archetype artifact, the Dataflow starter.)

Next, let's actually use some pre-built pipelines.

$ git clone https://github.com/GoogleCloudPlatform/training-data-analyst

This repo contains some pre-built pipelines.

Now add Java to the path, so that we can compile and run this pipeline (this uses Apache Beam, so there is a whoooole bunch of libraries that are downloaded by Maven before this runs).

This builds and runs the pipeline locally:

$ export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH

$ mvn compile -e exec:java \
  -Dexec.mainClass=com.google.cloud.training.dataanalyst.javahelp.Grep

[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...

[INFO] -----------------------------------------------------------------------
-
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------------
-
[INFO] Total time: 33.430 s
[INFO] Finished at: 2017-10-10T21:08:52-07:00
[INFO] Final Memory: 37M/91M
[INFO] -----------------------------------------------------------------------

$

Now, we can run the pipeline on the cloud.

First, we copy some of the code over to Google Cloud Storage. The reason we do this is to illustrate how we would perform a grep on files that are sitting in Google Cloud Storage. (This is just an arbitrary example...)

The Grep program here is searching for each instance of the word "import" in this set of Java files.

We copy it over to Google cloud storage using the gsutil:

$ gsutil cp src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://charlesreid1-dataflow/javahelp

Next, we modify the local version of the Java files (note these are the ones that are actually run on the Google Cloud Dataflow platform, so we don't need to modify them before copying them over to Google Cloud storage).

We modify them by specifying an input file and an output file that are located ON Google Cloud storage, specifying a URL in the form of gs://<path-to-java-files>.

Now, we can use the run_oncloud script, which is basically just a call to Maven that looks similar to the call above, but with a few extra specifications:

$ ./run_oncloud1.sh not-all-broken charlesreid1-dataflow Grep

project=not-all-broken  bucket=charlesreid1-dataflow  main=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building javahelp [1.0.0,2.0.0]
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ javahelp ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ javahelp ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 4 source files to /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/target/classes
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ javahelp ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Dataflow SDK version: 2.1.0
Submitted job: 2017-10-10_21_27_30-5002896350771966928
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.808 s
[INFO] Finished at: 2017-10-10T21:27:31-07:00
[INFO] Final Memory: 44M/188M
[INFO] ------------------------------------------------------------------------

$ 

Now that the Dataflow job is submitted, we can go to the Google Cloud console and monitor Dataflow jobs from there. The first result we saw was a failed job:

 (abd61e2c1e6bfed1): Workflow failed. Causes: (abd61e2c1e6bf6e2): There was a problem refreshing your credentials. Please check:
1. Dataflow API is enabled for your project.
2. There is a robot service account for your project:
service-[project number]@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to your project. If this account does not appear in the permissions tab for your project, contact Dataflow support.

To fix this:

  • Click the hamburger menu (right-hand side menu in Console)
  • Click APIs
  • The Dataflow API appeared to be already enabled (but, apparently NOT)
  • Clicked the + sign at the top to "Enable APIs"
  • This brought up a search bar
  • Searched for Dataflow API
  • This pulled up the Datafow API and a blue Enable button
  • Clicked the Enable button
  • Now there was a green check mark next to the Dataflow API that was not there before

Then I resubmitted my job through Maven.

$ ./run_oncloud1.sh not-all-broken charlesreid1-dataflow Grep

project=not-all-broken  bucket=charlesreid1-dataflow  main=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building javahelp [1.0.0,2.0.0]
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ javahelp ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ javahelp ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ javahelp ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Dataflow SDK version: 2.1.0
Submitted job: 2017-10-10_21_34_37-14825590390669419114
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.268 s
[INFO] Finished at: 2017-10-10T21:34:37-07:00
[INFO] Final Memory: 33M/79M
[INFO] ------------------------------------------------------------------------

$ 


This time, opening the job in the Cloud Console, I saw a couple of boxes with green check marks.

Everything is doing just fine.

Succeeded in 2 minutes 47 seconds.

Also, region was us-central1 (no idea how to set that.)

To verify the job worked, cat the results of the program (which are in output.txt):

$ gsutil cat gs://charlesreid1-dataflow/javahelp/output.txt

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;
 * A dataflow pipeline that finds the most commonly imported packages
                final String keyword = "import";
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
                final String searchTerm = "import";
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
 * imports elsewhere) (b) needs help (count the number of times this package has
                final String keyword = "import";
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
        // e.g: import java.util.List; --> java.util.List, java.util, java


Serverless Data Pipeline in Python

Link to Python lab files: https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/data_analysis/lab2/python/install_packages.sh

Start by running the install script to update pip:

$ sudo ./install_packages.sh

Then inspect the grep.py script:

$ cat grep.py

#!/usr/bin/env python
"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
import apache_beam as beam
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import re
import sys

def my_grep(line, term):
   if re.match( r'^' + re.escape(term), line):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
   output_prefix = '/tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run()

Execute it locally:


$ python grep.py
Traceback (most recent call last):
  File "grep.py", line 16, in <module>
    import apache_beam as beam
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/__init__.py", line 78, in <module>
    from apache_beam import io
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/__init__.py", line 21, in <module>
    from apache_beam.io.avroio import *
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/avroio.py", line 29, in <module>
    from apache_beam.io import filebasedsource
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", line 33, in <module>
    from apache_beam.io.filesystems import FileSystems
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 31, in <module>
    from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsfilesystem.py", line 27, in <module>
    from apache_beam.io.gcp import gcsio
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsio.py", line 36, in <module>
    from apache_beam.utils import retry
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 38, in <module>
    from apitools.base.py.exceptions import HttpError
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/__init__.py", line 21, in <module>
    from apitools.base.py.base_api import *
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 31, in <module>
    from apitools.base.protorpclite import message_types
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/protorpclite/message_types.py", line 25, in <module>
    from apitools.base.protorpclite import messages
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
    class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
    metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases


$ sudo pip install six==1.10.0

Collecting six==1.10.0
  Downloading six-1.10.0-py2.py3-none-any.whl
Installing collected packages: six
  Found existing installation: six 1.11.0
    Uninstalling six-1.11.0:
      Successfully uninstalled six-1.11.0
Successfully installed six-1.10.0

$ python grep.py

No handlers could be found for logger "oauth2client.contrib.multistore_file"
/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <
type 'NoneType'>.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)

$ # also see https://stackoverflow.com/questions/46300173/import-apache-beam-metaclass-conflict

Tried to prepare a pull request to add the six to the install_packages.sh script.

Turns out someone already fixed it. Oooookay.

Now copy over Java files to GCP (now I see, these are just copied there so we have something to look at):

$ gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://

Edit the script grepc.py to set the bucket and project names.

Submit the job:

$ python grepc.py

Now go to Cloud Console and check on the Dataflow jobs. The job runs okay.

Job succeeded in 4 minutes 32 seconds.

$ gsutil cat gs://charlesreid1-dataflow/output* 

dumps out everything you'd expect.

MapReduce with Dataflow

When we take the MapReduce idea and apply Dataflow to it:

  • Data is split into chunks
  • Each chunk is sent to a node
  • The request, or job, contains "map" instructions
  • Maps are sent to each node holding data
  • The map is executed on the data on the node
  • Finally, the reduce operation gathers/combines the data onto a single node

Map operations - highly parallelizable

Reduce operations - computations performed on a large number of rows

Example: each node holds a tax return, computes a return amount, then the reduce operation is accumulating state-by-state statistics

Tax return example:

  • Data: all tax returns
  • Each node: processes a few tax returns
  • Each tax return: processed by map node
  • Reduce operation: computing a marginal tax rate
  • Reduce nodes may process one or two states

Dataflow: ParDo (parallel do)

  • ParDo acts on one item at a time
  • Similar to Map in MapReduce
  • Multiple instances of a ParDo class on many machines, so should be stateless
  • That is, map operations operate statically on the data, do not "remember" things
  • Aggregation (e.g., time-average) should be done by reducer nodes, not compute nodes
  • Useful for certain kinds of operations:
  • Filtering (choosing which inputs to emit for the next step of the pipeline/computation)
  • Converting one type to another (parallel collection of floats to parallel collection of strings)
  • Extracting parts of an input (one particular field of a table row, e.g., extract Box 17 from a form)
  • Combining input values in a calculation (combine different parts of a document to calculate a percentage of taxes)

MapReduce Transforms

Now, let's cover how these map operations actually look. There are two basic kinds of mapping operations:

  • One-to-one map operations
  • One-to-many map operations

These are treated differently in Python versus Java.

Python Transforms

Python:

  • Can use a Map or a FlatMap
  • Map - one-to-one relationship between input and output (every input will result in some kind of output)
  • FlatMap - one-to-many relationship between input and output (usually with generator)
  • Generators or filters - we are choosing whether to return something or not; we may end up returning many things

Here is what a Map looks like, in code:

'WordLengths' >> beam.Map( lambda word : (word, len(word) ) )

And here is a FlatMap, in code, applying an externally-defined generator (note yield keyword):

def my_grep(line, term):
    if term in line:
        yield line

'grep' >> beam.FlatMap( lambda line : my_grep(line, searchTerm) )

Java Transforms

Java:

  • In both cases (one-to-one or one-to-many), use apply (with a ParDo) for both cases

We already covered an example of a flat map (returning zero, one, or multiple outputs for a single input).

As mentioned, we use apply in any case, whether 1:1 or not.

Here, we call apply, name the step "Grep", and into it, we pass a ParDo object and a function.

        p
                .apply("GetJava", TextIO.read().from(input)) //
                .apply("Grep", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        String line = c.element();
                        if (line.contains(searchTerm)) {
                            c.output(line);
                        }
                    }
                })) //
                .apply(TextIO.write().to(outputPrefix).withSuffix(".txt").withoutSharding());

Likewise,

PCollection<Integer> sizes = 
    lines.apply("length", ParDo.of(new DoFn<String, Integer>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            String line = c.element();
            c.output(line.length());
        }
    }))

Continuing with the tax example:

PCollection<Double> rates =

   taxReturns.apply("TaxStuff", ParDo.of(new DoFn<TaxDocument, Double>() {
       @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
           TaxDocument doc = c.element();
           // ........do some tax return processing here..........
           c.output(doc.getTaxReturn());
       }
   }))

This returns a PCollection of doubles.

How would we aggregate these to compute an average tax return rate by state?

  • We have a whole bunch of tax returns, and from each we compute a whole bunch of tax rates
  • Now we want to organize these by state, and compute an average return rate by state
  • What do we want to do differently?
  • Instead of returning a rate, and getting a PCollection of Doubles, we want to return a key-value pair (key = state, value = tax return)

Break down the steps:

  • Step 1: create a Key-Value pair in a ParDo
  • Step 2: group by key
PCollection<KV<String,Double>> rateAndState = p.apply( 
    ParDo.of(
        new DoFn<TaxDocument, KV<String, Double>>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {

                // Each element of the process context will be a TaxDocument object,
                // So process it however we need to...
                String[] fields = c.element().turnTaxDocumentIntoStringArray();

                //  .......... process the tax return doc ..........
                String state = fields[0];

                // ........... get tax return rate .............
                Double taxRate = getTaxRate(fields);
                c.output(KV.of(fields[0], taxRate));
            }
        }
    ));

PCollection< KV<String, Iterable<Double>> > grouped = rateAndState.apply( 
    GroupByKey.<String, Double>create() )

Performing a group by allows us to perform computations BY STATE...

  • We emit a key value pair - key is state, value is tax return
  • Then, we do a group-by
  • PCollection of key-value pairs are all jumbled up, so Dataflow groups the key-value pairs by key
  • That's what the GroupByKey function does.

Note that if you want to do something WITH the groups, you don't necessarily need the GroupBy explicitly.

First, an example of applying an aggregate function to a PCollection:

// Build a Dataflow pipeline to compute sales amounts
PCollection<Double> salesAmounts = ...;

// Now, we can apply a function to sum up all of the values in the PCollection
PCollection<Double> totalAmt = salesAmounts.apply( Combine.globally(new Sum.SumDoubleFn()) )

Combine.globally - we have to specify HOW we want it to combine things globally... So we give it a function to use to combine everything.

Now an example of applying an aggregate function, by key group, to a PCollection of key-value pairs:

// Build a Dataflow pipeline to compute sales amounts, keyed by salesperson
PCollection<KV<String, Double>> salesRecords = ...;

// Now, sum up sales by person - this implicitly does a GroupBy
PCollection<KV<String, Double>> totalSalesByPerson = 
                salesRecords.apply( 
                    Combine.<String, Double, Double>perKey(
                        new Sum.SumDoubleFn())
                );

We don't want to combine GLOBALLY this time, we want to combine PER KEY. Again, we have to specify HOW we want to combine these records together...

// Alternatively, do an explicit group by key
PCollection<KV<String, Double>> totalSalesByPerson = 
                salesRecords.apply( 
                    GroupByKey.<String, Double>create()
                );

If we did want to group things by key, and do something other than apply sum or mean or min or max to each group, we would replace the "Combine" with a "GroupByKey", because our verb has changed - we're no longer trying to combine records (aggregate records), we're now trying to group records by key.

GroupByKey returns an Iterable object - which we can then iterate through to sum up, or to do whatever else we happen to want to do.

Many other functions available:

  • Sum, Mean, etc.

This raises a natural question:

  • Do we do a combine per key?
  • Or do we group by each key, and then sum them up ourselves?
  • Let Dataflow do the optimization - it knows best how to group by key and count things up.
  • Use a Combine when possible, instead of a GroupByKey
  • If there is already a combine, and you're doing a simple function, don't use group by key and do it yourself

MapReduce Lab

Lab: specify/use command line options, and implement map/reduce operations in Dataflow

Link to lab: https://codelabs.developers.google.com/codelabs/cpb101-mapreduce-dataflow/#2

Link to repo: https://github.com/GoogleCloudPlatform/training-data-analyst

Java

Start by looking at IsPopular.java

Collects input files from source code of this project (javahelp), stringifies them, searches for import statements, and counts them all up.

Questions:

What getX() methods are present in the MyOptions class?

  • getOutputPrefix
  • getInput

What is default output prefix?

  • /tmp/output

How is the variable outputPrefix in main() set?

  • using accessor method setOutputPrefix()

What are the key steps in the pipeline?

  • GetJava step - I/O, reads all Java files from disk
  • GetImports - (parallel) processes each element, checks if line starts with keyword "import"
  • PackageUse - (parallel) assemble a String-Integer Key-Value map - each package is the key, and the value is 1
  • Sum - sum up values by key
  • Top5 - order the keys by value, and take the top 5 from that list
  • ToString - (parallel) process each key-value pair and put it in a string buffer
  • Everything ultimately ends up being sent to c.output()

Which steps happen in parallel?

  • GetImports step - looking for lines starting with "import"
  • PackageUse step - assembling string-integer key value maps to count up number of occurrences of each package that is iimported
  • ToString step - putting key-value pair into string buffer and printing to output

Which steps are aggregations?

  • The sum.integersPerKey() step is a sum aggregation of each key's counts


Now, assemble the Java pipeline using maven.

Start by setting the path:

export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH

Then compile the project class:

mvn compile -e exec:java \
 -Dexec.mainClass=com.google.cloud.training.dataanalyst.javahelp.IsPopular

What happens?

  • Lots of packages are downloaded
  • Build succeeds
  • Program is run
  • Output is in /tmp/output.csv
$ cat /tmp/output.csv
org,45
org.apache.beam,44
org.apache,44
org.apache.beam.sdk,43
org.apache.beam.sdk.transforms,16

Note the heart of the extraction process is the getPackages() method:

  • Start position is where "import" keyword ends
  • End position is where semicolon ; is
  • Call splitPackageName() on the contents
  • splitPackageName() will turn com.example.appname.library.widgetname into "com", "com.example", "com.example.appname", etc.


A word on command line options:

  • Class is using PipelineOptionsFactory to get options from command line
  • Allows us to pass in args, get

Now, re-run the package with maven, this time passing args on the command line:

Output prefix command line option is called outputPrefix because we have a method called getOutputPrefix

--outputPrefix=/tmp/myoutput

If we rename the method to get the output prefix to getMyOutputPrefix, then the command line option should change accordingly:

--myOutputPrefix=/tmp/myoutput

puts output in /tmp/myoutput.csv

Note the PipelineOptionsFactory is part of Apache Beam

import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

Link to PipelineOptions documentation: https://beam.apache.org/documentation/sdks/javadoc/0.2.0-incubating/org/apache/beam/sdk/options/PipelineOptions.html


Python

Start with is_popular.py

This does the same thing as the Java pipeline code - searches the Java source code for "import" statements

Open /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/python

Now install the packages that are needed:

$ sudo ./install_packages.sh

which basically just wraps two pip packages: pip install google-cloud-dataflow oauth2client=2.0

Now let's examine the script:

import apache_beam as beam
import argparse

Similar to Java package, we want to be able to specify some values on the command line.

We define a few functions:

  • splitPackageName()
  • getPackages()
  • packageUse()
  • by_value() (comparator function - returns True if left < right)

Finally, in the main method, we have:

  • Argument parser to get any input arguments
  • Apache Beam pipeline
  • Create input
  • Assemble pipeline:
  • GetJava - reads from text
  • GetImports - runs startsWith() function via lambda function
  • PackageUse - runs packageUse() function via lambda function
  • TotalUse - does beam.CombinePerKey(sum)
  • Top5 - does a beam.transforms.combiners.Top.Of(5, by_value)
  • Write - does a beam.io.WriteToText(output_prefix)

Finally, it runs the pipeline.

charlesreid1@not-all-broken:~/training-data-analyst/courses/data_analysis/lab2/python$ ./is_popular.py

charlesreid1@not-all-broken:~/training-data-analyst/courses/data_analysis/lab2/python$ cat /tmp/output-00000-of-00001
[(u'org', 45), (u'org.apache.beam', 44), (u'org.apache', 44), (u'org.apache.beam.sdk', 43), (u'org.apache.beam.sdk.transforms', 16)]

Now we can pass an argument - add --output_prefix to the command that is being run to modify the output file location/name:

charlesreid1@not-all-broken:~/training-data-analyst/courses/data_analysis/lab2/python$ ./is_popular.py --output_prefix=/tmp/myoutput

charlesreid1@not-all-broken:~/training-data-analyst/courses/data_analysis/lab2/python$ ls -l /tmp/myoutput*
-rw-r--r-- 1 charlesreid1 charlesreid1 133 Oct 16 01:30 /tmp/myoutput-00000-of-00001

Nicely done. All finished.

Side Inputs

Side inputs and in-memory objects:

We talked about a canonical form of processing in a data pipeline - we read data in from a source, we do some transforms, we write data out to a sink.

However, in reality we often need additional information during the transform process. Maybe you need external data - don't have all the information you need to process the tax form.

Suppose you have in-memory object that will not change - something fixed. Those can be provided as constructor parameters to each transform. Example: when we do the grep transform, we can say, "make the search term a String parameter."

public class Match extends DoFn<String, String> {
    
    private final String searchTerm;

    public Match(String searchTerm) { 
        this.searchTerm = searchTerm;
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
        String line = c.element();
        if(line.contains(searchTerm)) {
            c.output(line);
        }
    }
}

And later in the code, when we are actually assembling the pipeline, we can provide the grep term at the time of pipeline assembly:

p.apply("Grep", ParDo.of( new Match(searchTerm) ) );

This is not calculated dynamically - it's just provided, it's fixed in memory.

But SOMETIMES, you need an entirely separate PCollection (a second data source) to process the data from one source (one PCollection).

If you need a second PCollection, the procedure is as follows:

  • Take the other source (the smaller of the two) and convert it into a view
  • Can either be a list or a map - list if PCollection of objects, map if PCollection of key-value pairs
  • We turn the PCollection into a PCollectionView
  • This can now be provided as a side input
  • The main input is the first PCollection
  • The side input is the second PCollection - pass the view of that PCollection in as a side input
  • ParDo.of becomes ParDo.withSideInputs

Inside of the ParDo that has the side input, you can now access the side inputs via the process context, c.sideInput()

// Convert PCollection to a View (asList, asMap)
PCollection<KV<String, Integer>> cz = ...
PCollectionView<Map<String, Integer>> czmap = cz.apply("ToView", View.asMap());

// Call the ParDo with side input[s]
.apply("...", ParDo.withSideInputs(czmap)
                    .of(new DoFn<KV<String, Integer>>, KV<String, Double>>()
                    ))

// Within the ParDo, get the side input from the process context 
public void processElement(ProcessContext c) throws Exception {
    Integer fromcz = c.sideInput(czmap).get(czkey); 
    // This uses .get() because the side input is a map.
}

So the crucial components are:

  • Converting PCollection to PCollectionView
  • Pass the PCollectionView as a side input
  • Once that's done, you can get the side input from the process context, and use it just like a regular map or list

Side Inputs Lab

Link to lab: https://codelabs.developers.google.com/codelabs/cpb101-bigquery-dataflow-sideinputs/

What The Lab Does

Main goal of the lab:

  • Use BigQuery as a data source for Dataflow
  • Use the result of one pipeline as a side input to another pipeline

Question:

  • If I am a Java programmer, and I want to know which open source projects to contribute to, how do I do that?
  • Find well-used, popular packages (using Github, looking at all Java code in Github, find all input statements in Github, find most-used packages)
  • Meanwhile, we will also search all the Java code for "fixme" and "todo" comments
  • Then, combine the two - create a composite score (how popular, and how much help it needs)
  • Then take the top 1,000 of those and write them out

The Query

Start by testing out the BigQuery select query that we'll run.

(Optionally: click triangle next to project name > display project > "fh-bigquery"... lots of other interesting Github tables there. Java contents, JS contents, PHP/Python/SQL/Ruby/ipynb/Go contents, stars, file ages, et etc etc.)

Start by seeing what the contents of the database look like:

SELECT
    content
FROM
    [fh-bigquery:github_extracts.contents_java_2016]
LIMIT
    10

Next, we can look at how many Java files are actually being indexed:

SELECT
    COUNT(*)
FROM
    [fh-bigquery:github_extracts.contents_java_2016]

2M Java files

The Pipeline Code

Now navigate to the github repository and look at JavaProjectsThatNeedHelp.java:

Here's the flow of the program's main method:

  • Create a BigQuery string
  • Create a PCollection (javaContent) that results from a BigQueryIO object created from reading the results of a query
  • Apply a (parallel) function "ToLines" that maps a TableRow to a String array; the process element function defined inside takes the "content" field (containing source code) and splits the contents at "\n" characters, returning a string array

Separately, prepare the side input:

  • Create a PCollectionView (packagesThatNeedHelp) that results in a String-Integer map
  • Apply a (parallel) function "NeedsHelp" that maps String[] array to a Key-Value pair (String-Integer)
  • Within the NeedsHelp function's processElement fnction (defined inside), call parsePackageStatement(lines) to get a string array of package names and call countCallsForHelp(lines) to get an integer count of how many lines contain FIXME or TODO
  • Add the pairing (packageName, numHelpNeeded) to the output map as a key-value pair

Continuing with the side channel pipeline:

  • Apply a Sum.integersPerKey() aggregation function to count the number of help-wanted statements (values) for each package (keys)
  • Turn it into a map

Back to the original pipeline:

  • For the javaContent PCollection, we already turned the BigQuery result into a String array of lines of code
  • Apply a function "IsPopular" to parse import statements, create a key-value map (keys are libraries imported, values are 1 each)
  • Apply a Sum.integerPerKey() aggregation function to count the number of times each package is used
  • Apply a "CompositeScore" function (this is where the side input comes in!)
  • The function maps a String-Integer key-value map to a String-Double key-value map
  • Get package names from process context elements .getKey() method
  • Get number of times used from the process context elements .getValue() method
  • Get number of cries for help from side input: pass the view to c.sideInput, which will return the map, then use map methods to get items from the map: c.sideInput(packagesThatNeedHelp).get(packageName)
  • If number of cries for help is not null, compute the score: log(number of times used) * log(number of cries for help)

Now that we've combined our two streams, we can limit to the top N:

  • Apply a Top_N function, which calls Top.of()
  • Apply a ToString function, which turns the String-Double map to a String
  • The process element function (defined internally for ToString) sorts a list of key-value pairs, then reverses the sot order, then prints out "<key>, <value>"

Final step:

  • Final step is to apply a TextIO.write() function to write the results to a CSV file without sharding

Run the Project

$ ./run_oncloud3.sh not-all-broken charlesreid1-dataflow JavaProjectsThatNeedHelp

which is basically doing this:

mvn compile -e exec:java \
 -Dexec.mainClass=$MAIN \
      -Dexec.args="--project=$PROJECT \
      --stagingLocation=gs://$BUCKET/staging/ \
      --tempLocation=gs://$BUCKET/staging/ \
      --outputPrefix=gs://$BUCKET/javahelp/output \
      --runner=DataflowRunner"
<pre>

Downloads packages... runs... build success.

Note: gs://$BUCKET/staging contains ALL the jar files that are needed to run this code (lots)

<pre>
$ gsutil ls gs://charlesreid1-dataflow/staging | wc -l
108

A few notes on interacting with google cloud storage:

  • If the output directory does not exist, it will not create the output directory itself
  • If the output directory does not exist, it will also NOT complain - it will run the entire job, and not dump anything out at all
  • If the directory exists and has a bunch of stuff in it already, you can't just move the old directory and expect it to create the new directory. It will not.
  • You also cannot create an empty directory locally and copy it over - it will not accept empty directories.
  • Procedure:
  • Create an empty directory
  • Touch a dummy file in the empty directory
  • Use gsutil cp -r to copy the directory plus dummy file to cloud storage bucket

The Results

Now, we examine the results.

$ gsutil cat gs://charlesreid1-dataflow/javahelp/output.csv | head -n20
org,179.7793389303244
com,175.43281411202364
org.apache,131.60835910566712
net,124.96262936713585
com.google,122.70986973135382
io,111.37299249725882
de,105.31330406537631
org.eclipse,104.1577613103832
java,102.22628520536045
edu,100.40464589157374
org.sakaiproject,96.09340029592454
org.elasticsearch,95.66166368451726
com.github,92.18472195070443
fr,91.6914097434803
com.google.common,91.27191142872923
com.facebook,89.32210589208778
com.intellij,88.3926367866704
android,88.0388239374179
me,86.72535725236294
com.google.devtools,86.27626028067898

Use a regular expression with grep to limit to more interesting results containing 2+ dots in the library name (the part before the comma):

$ gsutil cat gs://charlesreid1-dataflow/javahelp/output.csv | grep "^\w\{2,\}\.\w\{2,\}\..*,.*" | head -n 20
com.google.common,91.27191142872923
com.google.devtools,86.27626028067898
com.google.devtools.build,85.25452319892487
com.google.devtools.build.lib,83.64433400764051
org.apache.hadoop,79.57074456170453
org.apache.commons,76.6474001748202
org.projectfloodlight.openflow,76.29062724095293
fr.lip6.move,75.2226870554725
fr.lip6.move.pnml,75.05236165565037
org.projectfloodlight.openflow.protocol,74.7929840192626
com.facebook.presto,73.18559998427968
org.chromium.chrome,71.66935416565529
com.google.common.collect,71.55997742547656
com.evolveum.midpoint,71.39203241557313
com.facebook.buck,71.299805433909
org.pentaho.di,70.7780308794822
org.chromium.chrome.browser,70.4432062316782
com.google.javascript,70.03667894397347
org.openhab.binding,69.62266463424652
org.ovirt.engine,69.60843726601736

Streaming Data into Dataflow

Serverless data analysis and streaming

Previous labs: we have been running data using a batch workflow (reading stored data)

But we can apply same workflows/pipelines to streaming data sources (e.g., PubSub)

Dataflow handles out-of-order records, delays, etc. that happen with streaming sources

Can associate timestamp with inputs:

  • PubSub has automatic timestmap
  • Timestamp is time that message was published to topic
  • For batch, we explicitly assign a timestamp wherever we might need one

Example:

c.outputWithTimestamp(f, Instant.parse(fields[2]));

This allows you to "trick" your streaming pipeline into handling batch data as well

Use windows for aggregating unbounded ("infinite") collections

Example: sliding window of 2 minutes, every 30 seconds

.apply("window", Window.into( SlidingWindows
                                                .of(Duration.standardMinutes(2))
                                                .every(Duration.standardSeconds(30))
))

Then when you do a group by key, group sums, aggregations, etc. - all carried out in the context of that sliding 2 minute window

Doing a mean-per-key is now no longer a mean per key over ENTIRE data set, it is now over the sliding window ONLY

Think of PubSub as the shock absorber - it absorbs all the data

That's then passed on to Dataflow, and added to BigQuery

You can still query BigQuery data warehouse, even as data are streaming in

Streaming Lab

Link to lab: https://codelabs.developers.google.com/codelabs/cpb101-bigquery-dataflow-streaming/

Objective:

  • Create and read from a PubSub topic
  • Use Dataflow to aggregate streaming records coming in (real-time) via PubSub (using windowed aggregate)
  • Stream aggregate statistics into BigQuery
  • Analyze results while data are streaming into data warehouse


Set Up PubSub and BigQuery

Start by creating BigQuery data set called "demo"

Then create PubSub data source called "streamdemo"

Back to the same training-data-analyst repository: https://github.com/GoogleCloudPlatform/training-data-analyst

View the pipeline code at StreamDemoConsumer.java: https://github.com/GoogleCloudPlatform/training-data-analyst/blob/b867c7039fafbe9616b1a6304fc74d14159a05a5/courses/data_analysis/lab2/javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/StreamDemoConsumer.java

The Java PubSub Consumer Code

Overview of StreamDemoConsumer code:

Main method does the following:

  • Create pipeline options factory to extract command line arguments
  • Build table schema for the sink of this pipeline (timestamps, number of words)
  • Create the pipeline parts:
  • "GetMessages" function reads strings from the specified topic
  • "Window" applies a sliding window (2 minute window, every 30 seconds)
  • "WordsPerLine" (parallel function) splits String line and counts number of words
  • "WordsInTimeWindow" applies Sum.integersGlobally() aggregate function
  • "ToBQRow" (parallel function) creates a TableRow object and sets the two column values, "timestamp" and "num_words"
  • Finally, we apply BigQuery.writeTableRow(), TO the output, WITH our schema
  • Last but not least, we actually run the pipeline.

To run, use the run_oncloud4.sh script, which hard-codes the name of the class.

IMPORTANT: this pipeline does not automatically exit - when you run the script, it will build the pipeline, submit the job, and will report a build success. It will then return you back to the command prompt.

To generate data for the PubSub topic: open the PubSub section of the Cloud Console, and look at the top for "Publish Message"

(I published a few messages)

Now go to BigQuery and see if you can query the data warehouse. I was not able to. No tables had showed up yet, no fields.

(Switched to Dataflow section of Cloud Console, saw that the graph was still being analyzed)

Waited a bit, switched back to BigQuery:

SELECT timestamp, num_words
FROM [not-all-broken:demos.streamdemo]
ORDER BY timestamp ASC

Bingo, some results came up:

timestamp,num_words
2017-10-17 00:09:08.267999 UTC,391
2017-10-17 00:09:36.210999 UTC,486
2017-10-17 00:10:07.535000 UTC,486
2017-10-17 00:10:35.854000 UTC,486
2017-10-17 00:11:08.479000 UTC,270
2017-10-17 00:11:35.796999 UTC,175
2017-10-17 00:12:08.414000 UTC,335
2017-10-17 00:12:35.891000 UTC,585
2017-10-17 00:13:08.217000 UTC,410
2017-10-17 00:13:35.536000 UTC,410
2017-10-17 00:14:07.852999 UTC,250
2017-10-17 00:15:07.492000 UTC,38

To stop the pipeline, use the Cloud SDK command line tool or the Cloud Console.

Open the job, click "Stop Job" on the right side.

  • Cancel - stops the pipeline immediately
  • Drain - finishes processing the rest of the contents of the PubSub stream

Recap

What did we do in module 2?

Dataflow:

  • Stream events, metrics, etc from Cloud PubSub into Cloud Dataflow
  • Raw logs, files, assets, Google Analytics data, etc from Cloud Storage into Cloud Dataflow
  • (Alt: batch stuff can go to Cloud Dataproc)
  • Dataflow is more flexible than Dataproc because of the ability to switch between batch and streaming
  • We also saw that Dataflow can dump results to BigQuery or to BigTable
  • BigQuery and BigTable, in turn, feed into Cloud Machine Learning (large scale) to train our own machine learning models
  • Extremely important for machine learning: you're training your machine learning model on historical data (e.g., log files), and you want to use it to MAKE PREDICTIONS from real-time data

This is a very common reference architecture on Google Cloud:

  • Stream/Batch feeds into Dataflow
  • Dataflow feeds into BigQuery/BigTable
  • BigQuery/BigTable feed into Cloud Machine Learning Engine


Resources

Via Google

Cloud Dataflow: https://cloud.google.com/dataflow

Which Java projects need help? https://medium.com/google-cloud/popular-java-projects-on-github-that-could-use-some-help-analyzed-using-bigquery-and-dataflow-dbd5753827f4

Processing logs at scale using Dataflow: https://cloud.google.com/solutions/processing-logs-at-scale-using-dataflow

More solutions from Google Cloud: https://cloud.google.com/solutions/

Other

Docker image with Google Cloud SDK and BigQuery: https://blog.openbridge.com/how-to-make-learning-google-cloud-and-bigquery-easy-76fcdd3a61b7

PubSub

PubSub Push Subscriber Guide: https://cloud.google.com/pubsub/docs/push

PubSub Pull Subscriber Guide: https://cloud.google.com/pubsub/docs/pull

PubSub Subscriber Overview: https://cloud.google.com/pubsub/docs/subscriber

Firebase (mobile platform) PubSub Triggers: https://firebase.google.com/docs/functions/pubsub-events

Firebase Cloud Functions: https://firebase.google.com/docs/functions/use-cases

Triggers on cloud functions:

Flags