From charlesreid1

Line 259: Line 259:


===Data Pipelines Lab===
===Data Pipelines Lab===
====Serverless Data Pipeline in Java====
Start by running the command to generate a local Maven prototype project
for a Dataflow pipeline:
<pre>
$ mvn archetype:generate \
  -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-starter \
  -DarchetypeGroupId=com.google.cloud.dataflow \
  -DgroupId=com.example.pipelinesrus.newidea \
  -DartifactId=newidea \
  -Dversion="[1.0.0,2.0.0]" \
  -DinteractiveMode=false
</pre>
This creates a new Maven project structure, with a ready-to-go
barebones class that contains the basic structure of a Pipeline.
(This comes from the archetype artifact, the Dataflow starter.)
Next, let's actually use some pre-built pipelines.
<pre>
$ git clone https://github.com/GoogleCloudPlatform/training-data-analyst
</pre>
This repo contains some pre-built pipelines.
Now add Java to the path, so that we can compile and run this pipeline
(this uses Apache Beam, so there is a whoooole bunch of libraries
that are downloaded by Maven before this runs).
This builds and runs the pipeline locally:
<pre>
$ export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
$ mvn compile -e exec:java \
  -Dexec.mainClass=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO] -----------------------------------------------------------------------
-
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------------
-
[INFO] Total time: 33.430 s
[INFO] Finished at: 2017-10-10T21:08:52-07:00
[INFO] Final Memory: 37M/91M
[INFO] -----------------------------------------------------------------------
$
</pre>
Now, we can run the pipeline on the cloud.
First, we copy some of the code over to Google Cloud Storage.
The reason we do this is to illustrate how we would perform
a grep on files that are sitting in Google Cloud Storage.
(This is just an arbitrary example...)
The Grep program here is searching for each instance
of the word "import" in this set of Java files.
We copy it over to Google cloud storage using the gsutil:
<pre>
$ gsutil cp src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://charlesreid1-dataflow/javahelp
</pre>
Next, we modify the local version of the Java files (note these are the ones
that are actually run on the Google Cloud Dataflow platform, so we don't need
to modify them before copying them over to Google Cloud storage).
We modify them by specifying an input file and an output file that are
located ON Google Cloud storage, specifying a URL in the form of
gs://<path-to-java-files>.
Now, we can use the run_oncloud script, which is basically just a call to
Maven that looks similar to the call above, but with a few
extra specifications:
<pre>
$ ./run_oncloud1.sh not-all-broken charlesreid1-dataflow Grep
project=not-all-broken  bucket=charlesreid1-dataflow  main=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building javahelp [1.0.0,2.0.0]
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ javahelp ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ javahelp ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 4 source files to /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/target/classes
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ javahelp ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Dataflow SDK version: 2.1.0
Submitted job: 2017-10-10_21_27_30-5002896350771966928
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.808 s
[INFO] Finished at: 2017-10-10T21:27:31-07:00
[INFO] Final Memory: 44M/188M
[INFO] ------------------------------------------------------------------------
$
</pre>
Now that the Dataflow job is submitted, we can go to the Google Cloud console
and monitor Dataflow jobs from there. The first result we saw was a failed job:
<pre>
(abd61e2c1e6bfed1): Workflow failed. Causes: (abd61e2c1e6bf6e2): There was a problem refreshing your credentials. Please check:
1. Dataflow API is enabled for your project.
2. There is a robot service account for your project:
service-[project number]@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to your project. If this account does not appear in the permissions tab for your project, contact Dataflow support.
</pre>
To fix this:
* Click the hamburger menu (right-hand side menu in Console)
* Click APIs
* The Dataflow API appeared to be already enabled (but, apparently NOT)
* Clicked the + sign at the top to "Enable APIs"
* This brought up a search bar
* Searched for Dataflow API
* This pulled up the Datafow API and a blue Enable button
* Clicked the Enable button
* Now there was a green check mark next to the Dataflow API that was not there before
Then I resubmitted my job through Maven.
<pre>
$ ./run_oncloud1.sh not-all-broken charlesreid1-dataflow Grep
project=not-all-broken  bucket=charlesreid1-dataflow  main=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building javahelp [1.0.0,2.0.0]
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ javahelp ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ javahelp ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ javahelp ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Dataflow SDK version: 2.1.0
Submitted job: 2017-10-10_21_34_37-14825590390669419114
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.268 s
[INFO] Finished at: 2017-10-10T21:34:37-07:00
[INFO] Final Memory: 33M/79M
[INFO] ------------------------------------------------------------------------
$
</pre>
This time, opening the job in the Cloud Console,
I saw a couple of boxes with green check marks.
Everything is doing just fine.
Succeeded in 2 minutes 47 seconds.
Also, region was us-central1 (no idea how to set that.)
To verify the job worked, cat the results of the program
(which are in output.txt):
<pre>
$ gsutil cat gs://charlesreid1-dataflow/javahelp/output.txt
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;
* A dataflow pipeline that finds the most commonly imported packages
                final String keyword = "import";
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
                final String searchTerm = "import";
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
* imports elsewhere) (b) needs help (count the number of times this package has
                final String keyword = "import";
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
        // e.g: import java.util.List; --> java.util.List, java.util, java
</pre>
====Serverless Data Pipeline in Python====
Start by running the install script to update pip:
<pre>
$ sudo ./install_packages.sh
</pre>
Then inspect the grep.py script:
<pre>
$ cat grep.py
#!/usr/bin/env python
"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
import apache_beam as beam
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import apache_beam as beam
import re
import sys
def my_grep(line, term):
  if re.match( r'^' + re.escape(term), line):
      yield line
if __name__ == '__main__':
  p = beam.Pipeline(argv=sys.argv)
  input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
  output_prefix = '/tmp/output'
  searchTerm = 'import'
  # find all lines that contain the searchTerm
  (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
  )
  p.run()
</pre>
Execute it locally:
<pre>
$ python grep.py
Traceback (most recent call last):
  File "grep.py", line 16, in <module>
    import apache_beam as beam
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/__init__.py", line 78, in <module>
    from apache_beam import io
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/__init__.py", line 21, in <module>
    from apache_beam.io.avroio import *
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/avroio.py", line 29, in <module>
    from apache_beam.io import filebasedsource
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", line 33, in <module>
    from apache_beam.io.filesystems import FileSystems
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 31, in <module>
    from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsfilesystem.py", line 27, in <module>
    from apache_beam.io.gcp import gcsio
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsio.py", line 36, in <module>
    from apache_beam.utils import retry
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 38, in <module>
    from apitools.base.py.exceptions import HttpError
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/__init__.py", line 21, in <module>
    from apitools.base.py.base_api import *
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 31, in <module>
    from apitools.base.protorpclite import message_types
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/protorpclite/message_types.py", line 25, in <module>
    from apitools.base.protorpclite import messages
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
    class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
    metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
$ sudo pip install six==1.10.0
Collecting six==1.10.0
  Downloading six-1.10.0-py2.py3-none-any.whl
Installing collected packages: six
  Found existing installation: six 1.11.0
    Uninstalling six-1.11.0:
      Successfully uninstalled six-1.11.0
Successfully installed six-1.10.0
$ python grep.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <
type 'NoneType'>.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)
$ # also see https://stackoverflow.com/questions/46300173/import-apache-beam-metaclass-conflict
</pre>
Tried to prepare a pull request to add the six to the install_packages.sh script.
Turns out someone already fixed it. Oooookay.
Now copy over Java files to GCP (now I see, these are just copied there so we have something to look at):
<pre>
$ gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://
</pre>
Edit the script grepc.py to set the bucket and project names.
Submit the job:
<pre>
$ python grepc.py
</pre>
Now go to Cloud Console and check on the Dataflow jobs. The job runs okay.
Job succeeded in 4 minutes 32 seconds.
<pre>
$ gsutil cat gs://charlesreid1-dataflow/output*
</pre>
dumps out everything you'd expect.


===MapReduce with Dataflow===
===MapReduce with Dataflow===

Revision as of 23:16, 17 October 2017

Serverless Data Analysis with Dataflow

Module 2: Data Processing Pipelines with Dataflow

What Is Dataflow

Dataflow:

  • Way to execute data processing pipelines on the cloud
  • Flexible sources/sinks (e.g., read from BigQuery and write to Cloud Storage)
  • Steps - transforms - are elastic, can be scaled to more machines as needed
  • Code is written using open source API (Apache Beam)
  • Cloud Dataflow is the Apache Beam "pipeline service"
  • Other Apache Beam pipeline services: Flink, Spark
  • Example: read from GCS, perform filtering, perform grouping, perform transform, then write results to GCS

Each step: user-defined code (Java or Python classes)

ParDo - can run a particular transform in the context of a parallel do

Why Dataflow?

  • Batch or streaming
  • Cloud Storage - batch data (e.g., historical data) source
  • Cloud PubSub - streaming source
  • Can use the SAME PIPELINE for both scenarios
  • Can have Dataflow write to various sinks
  • BigQuery - batch results storage sink
  • Cloud Storage - batch results storage sink
  • PubSub - streaming results sink

For streaming cases:

  • Define a sliding window for streaming data
  • Change input and output to read from an UNBOUNDED source
  • Then define a window, e.g., 60 minutes

Data Pipelines

Can write pipelines in Java or Python

Concepts:

  • Pipeline - set of steps (transforms)
  • The pipeline is executed on the cloud by a runner
  • Apache Beam code forms the pipeline, Dataflow is the runner
  • Each step is elastically scaled
  • Source - where the input data comes from
  • Sink - where the transformed data goes

Pcollection:

  • Each transform on the pipeline takes a parallel collection (Pcollection) as an input
  • Pcollection - a list or map of items that does not need to be bounded by the size of the machine, does not need to fit into memory

Pipeline:

  • Directed graph of steps
  • Read in data, transform it, write data out
  • Example Java pipeline:
import org.apache.beam.sdk.Pipeline;

public static void main(String[] args) {
    // Create pipeline 
    // Parameterize with input args
    Pipeline p = Pipeline.create(PipelineOptionsFactoryfromArgs(arg));

    p.apply(TextIO.Read.from("gs://..."))   // Read the input
     .apply(new CountWords())               // Count (process) the text
     .apply(TextIO.Write.to("gs://..."));   // Write output to GCS

    // Now run the pipeline
    p.run();
}

p.run() executes the pipeline "graph" on the runner that will execute the pipeline

Direct runner - runs the pipeline on a single instance of the local machine

Dataflow runner - graph gets launched on the cloud

Python API: similar feel...

import apache_beam as beam

if __name__=="__main__":
    # Create pipeline 
    # Parameterize on input args
    p = beam.Pipeline(argv = sys.argv)

    (p
        | beam.io.ReadFromText("gs://...")              # Read input
        | beam.FlatMap(labda line: count_words(line))   # Process
        | beam.io.WriteToText("gs://...")               # Write output
    )

    p.run()     # Run the pipeline

Python uses the pipe operator to carry out transforms in sequence.

Step 1: create graph

Step 2: run it

Pcollections

Input to transform: Pcollection

Output from transform: Pcollection

All data in pipeline is represented with a Pcollection

# Java:
PCollection<String> lines = p.apply(...)

We can also define a transform to happen within a ParDo context, which will parallelize the transform, by defining a DoFn

Above, we define a collection of Strings called lines.

Below, we perform a transform for each line (each String in the collection called lines)

PCollection<Integer> sizes = 
    lines.apply("Length",
                parDo.of(new DoFn<String, Integer>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception{
                        String line = c.element();
                        c.output(line.length());
                    }
                }
            ));

Above - anonymous function that inherits from DoFn, defines processElement() method, result is a collection of integers that you can then transform with the next step

In Python:

lines = p | ...

Now, for every line that comes in, return the length of the line:

sizes = lines | "Length" >> beam.Map( lambda line : len(line))

This name is important - shows up in the monitoring console

Dataflow allows you to replace parts in a pipeline, WITHOUT ANY LOSS OF DATA (any data not processed by old pipeline will be processed by new pipeline)

But for that exchange of transforms to work, they need to have unique names

In Python, overloading >> so that "Length" >> beam.Map(...) means "call this map Length and have it perform a Map operation"

Executing Pipelines

To process data with pipelines, need to read data into pipelines

Can read input data from GCS, BigQuery, PubSub

Will look at a few example pipelines, first in Java, then in Python.

Java Pipelines

Example of reading text into a String:

// Note the wildcard syntax
PCollection<String> lines = p.apply(TextIO.Read.from("gs://.../input-*.csv.gz")

Example of reading data from PubSub:

PCollection<String> lines = p.apply(PubsubIO.Read.from("input_topic"));

Example of running a BigQuery query and returning a table row:

String javaQuery = "SELECT x, y, z FROM [project:dataset.tablename]";
PCollection<TableRow> javaContent = p.apply(BigQueryIO.Read.fromQuery(javaQuery));

The last example returns a PCollection of TableRow objects

Likewise with sinks - whatever you can read, you can also write

Write data to file system, GCS, BigQuery, or PubSub

lines.apply(TextIO.Write.to("/data/output").withSuffix(".txt")

If the output files are very small, and you don't want to deal with the hassle of sharding, can also say:

.apply(TextIO.Write.to("/data/output").withSuffix(".csv").withoutSharding()

(Note that this requires all I/O to happen on a single machine)

This may require transformation of relevant data from (whatever type) to String before writing out using TextIO (which only accepts Strings)

To execute your pipeline, two options:

  • Option 1: run Java, pass it the classpath, give it the name of the file/class with the main method
  • Option 2: run using maven

Option 2 looks like:

mvn compile -e exec:java -Dexec.mainClass=${MAIN}

And to run in the cloud, use mvn to submit the job to Dataflow

mvn compile -e exec:java \
-Dexec.mainClass=$MAIN \
-Dexec.args="--project=$PROJECT \
--stagingLocation=gs://$BUCKET/staging/ \
--tempLocation=gs://$BUCKET/staging/ \
--runner=DataflowRunner"

If using Java version of Dataflow, use Maven

  • Default is to use a local runner
  • Can add arguments to specify project (b/c this controls billing), staging location (where to stage code), temporary location (optional), and runner to use

Python Pipelines

Python pipeline execution:

Running locally just requires running the program without args:

python ./my-grep.py

To run in the cloud, specify parameters:

python ./my-grep.py \
--project=$PROJECT \
--job_name=myjob \
--staging_location=gs://$BUCEKT/staging/ \
--temp_location=gs://$BUCKET/staging \
--runner=DataflowRunner

Only difference is that Python requires a job name...

So far, we looked at what Dataflow is and some simple Dataflow concepts

Now we will write/implement a Dataflow pipeline

Data Pipelines Lab

Serverless Data Pipeline in Java

Start by running the command to generate a local Maven prototype project for a Dataflow pipeline:

$ mvn archetype:generate \
  -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-starter \
  -DarchetypeGroupId=com.google.cloud.dataflow \
  -DgroupId=com.example.pipelinesrus.newidea \
  -DartifactId=newidea \
  -Dversion="[1.0.0,2.0.0]" \
  -DinteractiveMode=false

This creates a new Maven project structure, with a ready-to-go barebones class that contains the basic structure of a Pipeline. (This comes from the archetype artifact, the Dataflow starter.)

Next, let's actually use some pre-built pipelines.

$ git clone https://github.com/GoogleCloudPlatform/training-data-analyst

This repo contains some pre-built pipelines.

Now add Java to the path, so that we can compile and run this pipeline (this uses Apache Beam, so there is a whoooole bunch of libraries that are downloaded by Maven before this runs).

This builds and runs the pipeline locally:

$ export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH

$ mvn compile -e exec:java \
  -Dexec.mainClass=com.google.cloud.training.dataanalyst.javahelp.Grep

[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...

[INFO] -----------------------------------------------------------------------
-
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------------
-
[INFO] Total time: 33.430 s
[INFO] Finished at: 2017-10-10T21:08:52-07:00
[INFO] Final Memory: 37M/91M
[INFO] -----------------------------------------------------------------------

$

Now, we can run the pipeline on the cloud.

First, we copy some of the code over to Google Cloud Storage. The reason we do this is to illustrate how we would perform a grep on files that are sitting in Google Cloud Storage. (This is just an arbitrary example...)

The Grep program here is searching for each instance of the word "import" in this set of Java files.

We copy it over to Google cloud storage using the gsutil:

$ gsutil cp src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://charlesreid1-dataflow/javahelp

Next, we modify the local version of the Java files (note these are the ones that are actually run on the Google Cloud Dataflow platform, so we don't need to modify them before copying them over to Google Cloud storage).

We modify them by specifying an input file and an output file that are located ON Google Cloud storage, specifying a URL in the form of gs://<path-to-java-files>.

Now, we can use the run_oncloud script, which is basically just a call to Maven that looks similar to the call above, but with a few extra specifications:

$ ./run_oncloud1.sh not-all-broken charlesreid1-dataflow Grep

project=not-all-broken  bucket=charlesreid1-dataflow  main=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building javahelp [1.0.0,2.0.0]
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ javahelp ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ javahelp ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 4 source files to /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/target/classes
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ javahelp ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Dataflow SDK version: 2.1.0
Submitted job: 2017-10-10_21_27_30-5002896350771966928
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 17.808 s
[INFO] Finished at: 2017-10-10T21:27:31-07:00
[INFO] Final Memory: 44M/188M
[INFO] ------------------------------------------------------------------------

$ 

Now that the Dataflow job is submitted, we can go to the Google Cloud console and monitor Dataflow jobs from there. The first result we saw was a failed job:

 (abd61e2c1e6bfed1): Workflow failed. Causes: (abd61e2c1e6bf6e2): There was a problem refreshing your credentials. Please check:
1. Dataflow API is enabled for your project.
2. There is a robot service account for your project:
service-[project number]@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to your project. If this account does not appear in the permissions tab for your project, contact Dataflow support.

To fix this:

  • Click the hamburger menu (right-hand side menu in Console)
  • Click APIs
  • The Dataflow API appeared to be already enabled (but, apparently NOT)
  • Clicked the + sign at the top to "Enable APIs"
  • This brought up a search bar
  • Searched for Dataflow API
  • This pulled up the Datafow API and a blue Enable button
  • Clicked the Enable button
  • Now there was a green check mark next to the Dataflow API that was not there before

Then I resubmitted my job through Maven.

$ ./run_oncloud1.sh not-all-broken charlesreid1-dataflow Grep

project=not-all-broken  bucket=charlesreid1-dataflow  main=com.google.cloud.training.dataanalyst.javahelp.Grep
[INFO] Error stacktraces are turned on.
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building javahelp [1.0.0,2.0.0]
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ javahelp ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/charlesreid1/training-data-analyst/courses/data_analysis/lab2/javahelp/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ javahelp ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ javahelp ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Dataflow SDK version: 2.1.0
Submitted job: 2017-10-10_21_34_37-14825590390669419114
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.268 s
[INFO] Finished at: 2017-10-10T21:34:37-07:00
[INFO] Final Memory: 33M/79M
[INFO] ------------------------------------------------------------------------

$ 


This time, opening the job in the Cloud Console, I saw a couple of boxes with green check marks.

Everything is doing just fine.

Succeeded in 2 minutes 47 seconds.

Also, region was us-central1 (no idea how to set that.)

To verify the job worked, cat the results of the program (which are in output.txt):

$ gsutil cat gs://charlesreid1-dataflow/javahelp/output.txt

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;
 * A dataflow pipeline that finds the most commonly imported packages
                final String keyword = "import";
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
                final String searchTerm = "import";
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
 * imports elsewhere) (b) needs help (count the number of times this package has
                final String keyword = "import";
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
        // e.g: import java.util.List; --> java.util.List, java.util, java


Serverless Data Pipeline in Python

Start by running the install script to update pip:

$ sudo ./install_packages.sh

Then inspect the grep.py script:

$ cat grep.py

#!/usr/bin/env python
"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
import apache_beam as beam
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import re
import sys

def my_grep(line, term):
   if re.match( r'^' + re.escape(term), line):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
   output_prefix = '/tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run()

Execute it locally:


$ python grep.py
Traceback (most recent call last):
  File "grep.py", line 16, in <module>
    import apache_beam as beam
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/__init__.py", line 78, in <module>
    from apache_beam import io
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/__init__.py", line 21, in <module>
    from apache_beam.io.avroio import *
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/avroio.py", line 29, in <module>
    from apache_beam.io import filebasedsource
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", line 33, in <module>
    from apache_beam.io.filesystems import FileSystems
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 31, in <module>
    from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsfilesystem.py", line 27, in <module>
    from apache_beam.io.gcp import gcsio
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/gcsio.py", line 36, in <module>
    from apache_beam.utils import retry
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 38, in <module>
    from apitools.base.py.exceptions import HttpError
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/__init__.py", line 21, in <module>
    from apitools.base.py.base_api import *
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 31, in <module>
    from apitools.base.protorpclite import message_types
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/protorpclite/message_types.py", line 25, in <module>
    from apitools.base.protorpclite import messages
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
    class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
    metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases


$ sudo pip install six==1.10.0

Collecting six==1.10.0
  Downloading six-1.10.0-py2.py3-none-any.whl
Installing collected packages: six
  Found existing installation: six 1.11.0
    Uninstalling six-1.11.0:
      Successfully uninstalled six-1.11.0
Successfully installed six-1.10.0

$ python grep.py

No handlers could be found for logger "oauth2client.contrib.multistore_file"
/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <
type 'NoneType'>.
  warnings.warn('Using fallback coder for typehint: %r.' % typehint)

$ # also see https://stackoverflow.com/questions/46300173/import-apache-beam-metaclass-conflict

Tried to prepare a pull request to add the six to the install_packages.sh script.

Turns out someone already fixed it. Oooookay.

Now copy over Java files to GCP (now I see, these are just copied there so we have something to look at):

$ gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://

Edit the script grepc.py to set the bucket and project names.

Submit the job:

$ python grepc.py

Now go to Cloud Console and check on the Dataflow jobs. The job runs okay.

Job succeeded in 4 minutes 32 seconds.

$ gsutil cat gs://charlesreid1-dataflow/output* 

dumps out everything you'd expect.

MapReduce with Dataflow

MapReduce Lab

Side Inputs

Side Inputs Lab

Streaming Data into Dataflow

Streaming Lab

Resources

Flags