How to Run a Big Data Text Processing Pipeline in Cloud Dataflow?

How to Run a Big Data Text Processing Pipeline in Cloud Dataflow?

Tudip

27 March 2019

Steps to Run Big Data Text Processing Pipeline in Cloud Dataflow

  • Overview:

    • Dataflow is a unified programming model and a managed service for developing and executing a wide range of data processing patterns including ETL, and continuous computation. Because Dataflow is a managed service, it can allocate resources on demand to latency while maintaining high utilization efficiency. This blog introduces the setup and requirements needed for Big Data Text Processing Pipeline in Cloud Dataflow
    • The Dataflow model is a combination of batch and stream processing so developers don’t have to make tradeoffs between correctness, cost, and processing time. In this codelab, you’ll learn how to run a Dataflow pipeline that counts the occurrences of unique words in a text file.
  • Setup and Requirements:

    • Sign-in to Google Cloud Platform console (console.cloud.google.com) to create a new project (Remember the project ID is a unique name across all Google Cloud projects.

      Google Cloud Dataflow

    • Click on the Menu icon and go to the API Manager page and Enable the following API’s.

      Google Cloud Platform

      • Google Compute Engine APIGoogle Compute Engine APIGoogle Compute Engine API
      • Google Dataflow API
      • Stackdriver Logging API
      • Google Cloud Storage
      • Google Cloud Storage JSON API
      • BigQuery API
      • Google Cloud Pub/Sub API
      • Google Cloud Datastore API
  • Create a new Cloud Storage bucket:

    • In Google Console click on the Menu icon and go to the Storage option.
    • Click on Create bucket button to create a storage bucket and fill the information as you want (Make sure to enter the bucket name is should be globally unique).                                                                                    Google Cloud Dataflow
    • When you have created bucket successfully then you will see the prompt message (“There are no objects in this bucket.“).                                                                                                                                                 Google Cloud Dataflow
  • Start Cloud Shell:

    • Open the Cloud Shell and set the project id as default by using the command
       gcloud config set project <PROJECT_ID>
  • Create a Maven project:

    • After the set project id, let’s get started by creating a Maven project containing the Cloud DataflowSDK for Java.
    • Run this command in your cloud shell
      mvn archetype:generate
      • mvn archetype:generate \
      • -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      • -DarchetypeGroupId=com.google.cloud.dataflow \
      • -DarchetypeVersion=1.9.0 \
      • -DgroupId=com.example \
      • -DartifactId=first-dataflow \
      • -Dversion=”0.1″ \
      • -DinteractiveMode=false \
      • -Dpackage=com.example
    • After running the command, you should see a new directory called first-dataflow under your current directory. This contains a Maven project that includes the Cloud Dataflow SDK for Java and example pipelines.
  • Run a text processing pipeline on Cloud Dataflow:

    • Please save our project-id and Cloud Storage bucket names as environment variables by using these commands.
      • export PROJECT_ID=<your_project_id>
      • export BUCKET_NAME=<your_bucket_name>
    • Go to the first-dataflow directory using Cloud Shell.
    • We are going to the pipeline called WordCount. Which is read text, tokens and performs a frequency count on each of these words.
    • Start the pipeline by running the command
      mvn compile exec:java

      in your cloud shell or terminal window.

    • For the –project, –stagingLocation, and –output arguments, the below command references the environment variables you set up earlier in this step.
      • mvn compile exec:java \
      • -Dexec.mainClass=com.example.WordCount \
      • -Dexec.args=”–project=${PROJECT_ID} \
      • –stagingLocation=gs://${BUCKET_NAME}/staging/ \
      • –output=gs://${BUCKET_NAME}/output \
      • –runner=BlockingDataflowPipelineRunner”
  • Check that your job succeeded:

    • Go to the Cloud Dataflow Monitoring UI then you will see your job which is created earlier so check whether this status is Succeeded or not.
    • Initially this job is in running state wait for 4-5 mins to complete this job.
    • Remember that when you ran the pipeline and specified an output bucket? Let’s take a look at the result output and staging files in that bucket.Cloud Dataflow Monitoring UI
  • Shut down your resources:

    • When your work is completed then go to the storage buckets and shut down your resources to delete the bucket which is created earlier.Google Developers ConsoleGoogle Developers ConsoleGoogle Developers ConsoleGoogle Developers Console
  • Acknowledgment:

 

search

Blog Categories

Request a quote