Hi every one, In this posts I going to show you, how enable storage api. With this option, you can get big data results faster. I was working with BigQuery and in some cases I need to get big data results.

I’m going to assume that you already have the Google project with BigQuery active. For this example I’m going to use public dataset. It doesn’t matter what data set you are using.

For example, this table has 540584 records.

Code without storage api


package main

import (
    "context"
    "fmt"
    "time"

    "cloud.google.com/go/bigquery"
    "google.golang.org/api/iterator"
    "google.golang.org/api/option"
)

var client *bigquery.Client
var ProjectId string
var CREDENTIALS_DIR = "credential.json"

func timeTracker(start time.Time, name string) {
    elapsed := time.Since(start)
    msg := fmt.Sprintf("==== %s -> time execute: %s ===", name, elapsed)

    fmt.Println(msg)
}

func connectClientToBq(ctx context.Context, projectId string) error {
    var err error
    path := CREDENTIALS_DIR

    credentialOpts := option.WithCredentialsFile(path)
    client, err = bigquery.NewClient(ctx, projectId, credentialOpts)
    if err != nil {
        return err
    }

    return nil
}

func closeClientBq() {
    ProjectId = ""

    if client != nil {
        client.Close()
    }
}

func executeQueryWithJob(ctx context.Context, sql string, params []bigquery.QueryParameter) (*bigquery.RowIterator, error) {
    defer timeTracker(time.Now(), "ExecuteQueryWithJob")

    q := client.Query(sql)
    q.Parameters = params
    job, err := q.Run(ctx)

    if err != nil {
        return nil, err
    }

    status, err := job.Wait(ctx)
    if err != nil {
        return nil, err
    }

    if err := status.Err(); err != nil {
        return nil, err
    }

    return job.Read(ctx)
}

func main() {
    defer closeClientBq()
    defer timeTracker(time.Now(), "Finish")
    var params []bigquery.QueryParameter
    ctx := context.Background()
    projectId := "demos-403423"
    err := connectClientToBq(ctx, projectId)

    if err != nil {
        fmt.Println(err.Error())
    }

    sql := "SELECT difficultyTarget, version, work_terahash, block_id FROM `bigquery-public-data.bitcoin_blockchain.blocks`"
    data, err := executeQueryWithJob(ctx, sql, params)

    if err != nil {
        fmt.Println(err.Error())
    }

    for {
        var values []bigquery.Value
        err := data.Next(&values)

        if err == iterator.Done {
            break
        }
        if err != nil {
            fmt.Println(err.Error())
        }
    }
}
"SELECT difficultyTarget, version, work_terahash, block_id FROM `bigquery-public-data.bitcoin_blockchain.blocks`"
    data, err := executeQueryWithJob(ctx, sql, params)

    if err != nil {
        fmt.Println(err.Error())
    }

    for {
        var values []bigquery.Value
        err := data.Next(&values)

        if err == iterator.Done {
            break
        }
        if err != nil {
            fmt.Println(err.Error())
        }
    }
}

For this query the execution time was:

==== ExecuteQueryWithJob -> time execute: 1.939979125s ===

The execution time to get all the data was:

==== Finish -> time execute: 1m30.853989167s ===

Storage API

Now if you enable the storage API, in the function connectClientToBq

func connectClientToBq(ctx context.Context, projectId string) error {
    var err error
    path := CREDENTIALS_DIR

    credentialOpts := option.WithCredentialsFile(path)
    client, err = bigquery.NewClient(ctx, projectId, credentialOpts)

    if err != nil {
        return err
    }

    err = client.EnableStorageReadClient(ctx, credentialOpts)
    if err != nil {
        return err
    }

    return nil
}

The times now were:

==== ExecuteQueryWithJob -> time execute: 3.695876083s ===

The execution time to get all the data was:

==== Finish -> time execute: 23.907911666s ===

I hope this post is useful to you. If you can share.

Resources

Deja un comentario

Blog de WordPress.com.