Using Google Cloud Functions to Create a Simple POST Endpoint

Apr 24, 2019 Google Cloud Platform Python

I was tempted to title this "how to use GCF to create a simple ETL process", but that's not quite what I'm demonstrating here.

It does loosely fit the description of an ETL process - the script extracts some values from a POSTed payload, rearranges some of the values to fit a specific schema, then loads the transformed payload into a data store.

But what you're going to see here is not the heavy lifting we normally think of when we see the acronym "ETL".

And maybe that's a good thing, as it illustrates the beautiful simplicity of Google's new Cloud Function service.

Some background:

I work on a data infrastructure team that already has an account and a project set up on Google Cloud Platform. That project is already associated with a data store - a BigQuery project/dataset. I'm not going to cover how to set all that up since it's out of scope here, but you can start with these docs: https://cloud.google.com/docs/

I'm currently working on a project to accept realtime event data from a media platform we work with. We expect the data to come in at a medium-to-high volume, but we're still in testing so I don't have details on how well this job will handle the volume or how well it will scale - that will come later.

The project:

What I am going to talk about is this flow, with some general info on how to build the tools I needed to handle each step:

  • vendor POSTs the event data payload to my http endpoint
  • we receive, validate, and transform the payload data
  • we write that data to a BigQuery table

The pieces I had to build to do this:

  • a Google Cloud Function
  • a BigQuery table

gcloud:

Before we go much further - assuming that you already have a Google Cloud project, with a BigQuery dataset, and all the permissions set up to link the two - you will also need the gcloudcommand line tool. Go here and follow the steps to install:

https://cloud.google.com/sdk/docs/quickstart-macos

gcloud is what you'll use to deploy your function to Google Cloud. Installation will update your PATH to include the Google Cloud SDK in ~/.bash_profile. You may need to go through some authorization steps using the email address you have associated with your project. You may not need to add any gcloud components, although if you do instructions are included in the installation output.

For the example here, you should probably have these components:

  • BigQuery Command Line Tool
  • Cloud SDK Core Libraries
  • Cloud Storage Command Line Tool

Setting up the script:

In a local folder, do some of the basic setup you normally would to start a Python project:

  • create a main.py - this wil be your script
  • add a requirements.txt for any libraries you might need to install
  • use virtualenv to keep everything contained, particularly if you're going to test locally

In your main.py, you are free to build your Python script in whatever way works for you. You can import any libraries you might need, you script structure can be as simple or as complex as you need it to be.

The only key requirement is that you name a function that will be the entry point for your script - that name will be how your function is referenced in the GCP dashboard, and will be used to deploy the code to GCP.

The code:

Now (finally!) let's look at some sample code:

In main.py, I've built a simple Flask app (I love how easily Flask handles POSTs).

import json

from flask import Flask, request
from google.cloud import bigquery

app = Flask(__name__)

I'll use this schema both to create my BigQuery table and to insert rows. This schema example includes several common column types used in BigQuery.

schema = [
    bigquery.SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE'),
    bigquery.SchemaField('event_type', 'STRING', 'NULLABLE'),
    bigquery.SchemaField('event_id', 'STRING', 'NULLABLE'),
    bigquery.SchemaField('has_insights', 'BOOLEAN', 'NULLABLE'),
    bigquery.SchemaField('video_insights', 'RECORD', 'REPEATED',
        fields=[
            bigquery.SchemaField('video_id', "STRING", 'NULLABLE'),
            bigquery.SchemaField('video_duration', "INTEGER", 'NULLABLE'),
        ],
    ),
    bigquery.SchemaField('categories', 'STRING', 'REPEATED'),
]
dataset = 'my_dataset'
table_name = 'my_events_table'

Here's the events() function that does a few things:

  • simple validation of the POST contents, the 'payload' (make sure it's JSON and that 'events' are included)
def events(request):
    payload = {}
    try:
        payload = request.get_json()
        events = payload['events']
    except Exception as e:
        response = app.response_class(
            response=json.dumps({'error': e.message}),
            status=400,
            mimetype='application/json'
        )
        return response

  • create the table in BigQuery (or not - this can be done in a few different ways, I've just included the example here as a convenience)
    try:
        create_table()
    except Exception as e:
        print("ERROR", e)

  • for each event, extract a few values, add them to a dict, add that dict to a new list (this is the 'transform' part of our mini-ETL)
    event_rows = []
    for p in payload['events']:
        entry = construct_entry(payload, p)
        event_rows.append(entry)

  • take that list of transformed events and insert them into the BigQuery table all at once
    try:
        insert_entries(event_rows)
    except Exception as e:
        print("Error on inserting entries: %s" % e)
        sys.exit()

  • and if it's all successful, return a 200
    response = app.response_class(response='', status=200)
    return response

Here are the utility functions that use the google.cloud.bigquery library to do all that stuff:

  • create a BigQuery table
  • transform each event into an entry
  • load the list of entries into the BigQuery table
def create_table():
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset)
    table_ref = dataset_ref.table(table_name)
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)
    print("Created table {}".format(table.full_table_id))
    return table.full_table_id

def construct_entry(payload, event):
    insights_list = []
    if event.get('video_insights', None):
        for i in event['video_insights']:
            v = {
                'id': vp.get('video_id', ''),
                'time_played': vp.get('video_duration', 0),
            }
            insights_list.append(v)
    entry = {
        'timestamp': event.get('timestamp', None),
        'type': event.get('event_type', ''),
        'id': event.get('event_id', ''),
        'categories': event.get('categories', []),
        'has_insights': event.get('has_insights', False),
        'insights': insights_list,
    }
    return entry

def insert_entries(event_rows):
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset)
    table_ref = dataset_ref.table(table_name)
    table = bigquery.Table(table_ref, schema=schema)

    try:
        response = client.insert_rows(table, event_rows)
    except Exception as e:
        print("Error: %s" % str(e))
        return False
    return True

google.cloud.bigquery docs are here: https://googleapis.github.io/google-cloud-python/latest/bigquery/reference.html

Finally here's the main method that uses Flask to run the app and route requests to the events() function:

if __name__ == '__main__':
    app = Flask(__name__)
    app.route('/events', methods=['POST'])(lambda: events(request))
    app.run(debug=True)

Altogether, that's only 100 lines of code! My example does work with a simplified payload, so as always, your mileage may vary.

I only have three libraries in my requirements.txt:

Flask==1.0.2
google-cloud-bigquery==1.3.0
google-cloud-storage==1.6.0

The sample payload:

And here's what you would expect a valid payload to look like for this example:

{
  "site_id": "example.com",
  "another_id": "FE7169C2",
  "events": [
    {
      "timestamp": "2017-07-25T09:15:36Z",
      "event_type": "ARTICLE_VIEW_EVENT",
      "event_id": "28C86A6C-B93F-4445-94D0-5926F6C0F723",
      "categories": ['Technology', 'Computers', 'News'],
      "has_insights": false
    }, {
      "timestamp": "2017-07-25T10:03:12Z",
      "event_type": "ARTICLE_VIEW_EVENT",
      "event_id": "A7ED75A5-475E-44EE-BAD9-3A57D8F547B2",
      "categories": ['Entertainment', 'Games'],
      "has_insights": true
      "video_insights": [
        {
          "video_id": "video1",
          "video_duration": 120
        }
      ]
    }
  ]
}

Deploying:

To deploy from the command line, make sure you're in the folder with the function code. Then run:

gcloud beta functions deploy events --trigger-http --runtime python37 --project my-project-name

Breaking it down:

  • gcloud beta functions deploy is the command that lets you create or update a Google Cloud Function
  • events is the name of the Google Cloud Function (as defined in example source code) that will be executed - you'll also see this in the dashboard
  • we specify --trigger-http to generate an http endpoint where our function can receive requests
  • runtime is the execution environment (there are also node.js and golang options)
  • finally, you need to include your project name

For more info about constructing a deployment, run:

gcloud beta functions deploy --help

When your deploy is successful, you'll see the entry point/http trigger values included in the return message, looking something like this:

entryPoint: events
httpsTrigger:
  url: https://region-my-project-name.cloudfunctions.net/events

And that's it! If you use CURL to post a valid payload to your new endpoint, you should shortly thereafter see a few records in your BigQuery table.

Etcetera:

I would guess that HTTP endpoints are going to be the most common use for Google Cloud Functions, but there are several other trigger types available. For more, take a look at:

https://cloud.google.com/functions/docs/calling/

You should put your code in Github or whatever your choice of repository is, but be aware that GCP also stores the most recent version of the source code. In your project, navigate to the functions dashboard, e.g.:

https://console.cloud.google.com/functions/list?project=my-project-name

And click through to your-function-name >> Source.

If you poke around your function dashboard on the GCP console, you'll also find some fun stuff like usage and activity charts, logging, and a little inline testing module.

Finally, here's a really good primer on GCF. This post is what that got me started:

Serverless Python Quickstart with Google Cloud Functions (Dustin Ingram)