2017-08-05 18:17:15 +00:00
|
|
|
// Copyright 2015 Google Inc. All Rights Reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
package bigquery
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
2017-09-13 12:09:48 +00:00
|
|
|
"math/rand"
|
|
|
|
"os"
|
|
|
|
"sync"
|
2017-08-05 18:17:15 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"cloud.google.com/go/internal"
|
|
|
|
gax "github.com/googleapis/gax-go"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
bq "google.golang.org/api/bigquery/v2"
|
|
|
|
)
|
|
|
|
|
|
|
|
// A Job represents an operation which has been submitted to BigQuery for processing.
|
|
|
|
type Job struct {
|
|
|
|
c *Client
|
|
|
|
projectID string
|
|
|
|
jobID string
|
|
|
|
|
|
|
|
isQuery bool
|
|
|
|
destinationTable *bq.TableReference // table to read query results from
|
|
|
|
}
|
|
|
|
|
|
|
|
// JobFromID creates a Job which refers to an existing BigQuery job. The job
|
|
|
|
// need not have been created by this package. For example, the job may have
|
|
|
|
// been created in the BigQuery console.
|
|
|
|
func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
|
|
|
|
job, err := c.service.getJob(ctx, c.projectID, id)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
job.c = c
|
|
|
|
return job, nil
|
|
|
|
}
|
|
|
|
|
2017-09-13 12:09:48 +00:00
|
|
|
// ID returns the job's ID.
|
2017-08-05 18:17:15 +00:00
|
|
|
func (j *Job) ID() string {
|
|
|
|
return j.jobID
|
|
|
|
}
|
|
|
|
|
|
|
|
// State is one of a sequence of states that a Job progresses through as it is processed.
|
|
|
|
type State int
|
|
|
|
|
|
|
|
const (
|
|
|
|
Pending State = iota
|
|
|
|
Running
|
|
|
|
Done
|
|
|
|
)
|
|
|
|
|
|
|
|
// JobStatus contains the current State of a job, and errors encountered while processing that job.
|
|
|
|
type JobStatus struct {
|
|
|
|
State State
|
|
|
|
|
|
|
|
err error
|
|
|
|
|
|
|
|
// All errors encountered during the running of the job.
|
|
|
|
// Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful.
|
|
|
|
Errors []*Error
|
|
|
|
|
|
|
|
// Statistics about the job.
|
|
|
|
Statistics *JobStatistics
|
|
|
|
}
|
|
|
|
|
|
|
|
// setJobRef initializes job's JobReference if given a non-empty jobID.
|
|
|
|
// projectID must be non-empty.
|
|
|
|
func setJobRef(job *bq.Job, jobID, projectID string) {
|
|
|
|
if jobID == "" {
|
2017-09-13 12:09:48 +00:00
|
|
|
// Generate an ID on the client so that insertJob can be idempotent.
|
|
|
|
jobID = randomJobID()
|
2017-08-05 18:17:15 +00:00
|
|
|
}
|
|
|
|
// We don't check whether projectID is empty; the server will return an
|
|
|
|
// error when it encounters the resulting JobReference.
|
|
|
|
job.JobReference = &bq.JobReference{
|
|
|
|
JobId: jobID,
|
|
|
|
ProjectId: projectID,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-09-13 12:09:48 +00:00
|
|
|
const alphanum = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
|
|
|
|
|
|
|
var (
|
|
|
|
rngMu sync.Mutex
|
|
|
|
rng = rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(os.Getpid())))
|
|
|
|
)
|
|
|
|
|
|
|
|
func randomJobID() string {
|
|
|
|
// As of August 2017, the BigQuery service uses 27 alphanumeric characters.
|
|
|
|
var b [27]byte
|
|
|
|
rngMu.Lock()
|
|
|
|
for i := 0; i < len(b); i++ {
|
|
|
|
b[i] = alphanum[rng.Intn(len(alphanum))]
|
|
|
|
}
|
|
|
|
rngMu.Unlock()
|
|
|
|
return string(b[:])
|
|
|
|
}
|
|
|
|
|
2017-08-05 18:17:15 +00:00
|
|
|
// Done reports whether the job has completed.
|
|
|
|
// After Done returns true, the Err method will return an error if the job completed unsuccesfully.
|
|
|
|
func (s *JobStatus) Done() bool {
|
|
|
|
return s.State == Done
|
|
|
|
}
|
|
|
|
|
|
|
|
// Err returns the error that caused the job to complete unsuccesfully (if any).
|
|
|
|
func (s *JobStatus) Err() error {
|
|
|
|
return s.err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Status returns the current status of the job. It fails if the Status could not be determined.
|
|
|
|
func (j *Job) Status(ctx context.Context) (*JobStatus, error) {
|
|
|
|
js, err := j.c.service.jobStatus(ctx, j.projectID, j.jobID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// Fill in the client field of Tables in the statistics.
|
|
|
|
if js.Statistics != nil {
|
|
|
|
if qs, ok := js.Statistics.Details.(*QueryStatistics); ok {
|
|
|
|
for _, t := range qs.ReferencedTables {
|
|
|
|
t.c = j.c
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return js, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Cancel requests that a job be cancelled. This method returns without waiting for
|
|
|
|
// cancellation to take effect. To check whether the job has terminated, use Job.Status.
|
|
|
|
// Cancelled jobs may still incur costs.
|
|
|
|
func (j *Job) Cancel(ctx context.Context) error {
|
|
|
|
return j.c.service.jobCancel(ctx, j.projectID, j.jobID)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait blocks until the job or the context is done. It returns the final status
|
|
|
|
// of the job.
|
|
|
|
// If an error occurs while retrieving the status, Wait returns that error. But
|
|
|
|
// Wait returns nil if the status was retrieved successfully, even if
|
|
|
|
// status.Err() != nil. So callers must check both errors. See the example.
|
|
|
|
func (j *Job) Wait(ctx context.Context) (*JobStatus, error) {
|
|
|
|
if j.isQuery {
|
|
|
|
// We can avoid polling for query jobs.
|
|
|
|
if _, err := j.c.service.waitForQuery(ctx, j.projectID, j.jobID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// Note: extra RPC even if you just want to wait for the query to finish.
|
|
|
|
js, err := j.Status(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return js, nil
|
|
|
|
}
|
|
|
|
// Non-query jobs must poll.
|
|
|
|
var js *JobStatus
|
|
|
|
err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
|
|
|
|
js, err = j.Status(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return true, err
|
|
|
|
}
|
|
|
|
if js.Done() {
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
return false, nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return js, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read fetches the results of a query job.
|
|
|
|
// If j is not a query job, Read returns an error.
|
|
|
|
func (j *Job) Read(ctx context.Context) (*RowIterator, error) {
|
|
|
|
if !j.isQuery {
|
|
|
|
return nil, errors.New("bigquery: cannot read from a non-query job")
|
|
|
|
}
|
|
|
|
var projectID string
|
|
|
|
if j.destinationTable != nil {
|
|
|
|
projectID = j.destinationTable.ProjectId
|
|
|
|
} else {
|
|
|
|
projectID = j.c.projectID
|
|
|
|
}
|
|
|
|
|
|
|
|
schema, err := j.c.service.waitForQuery(ctx, projectID, j.jobID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// The destination table should only be nil if there was a query error.
|
|
|
|
if j.destinationTable == nil {
|
|
|
|
return nil, errors.New("bigquery: query job missing destination table")
|
|
|
|
}
|
|
|
|
return newRowIterator(ctx, j.c.service, &readTableConf{
|
|
|
|
projectID: j.destinationTable.ProjectId,
|
|
|
|
datasetID: j.destinationTable.DatasetId,
|
|
|
|
tableID: j.destinationTable.TableId,
|
|
|
|
schema: schema,
|
|
|
|
}), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// JobStatistics contains statistics about a job.
|
|
|
|
type JobStatistics struct {
|
|
|
|
CreationTime time.Time
|
|
|
|
StartTime time.Time
|
|
|
|
EndTime time.Time
|
|
|
|
TotalBytesProcessed int64
|
|
|
|
|
|
|
|
Details Statistics
|
|
|
|
}
|
|
|
|
|
|
|
|
// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
|
|
|
|
type Statistics interface {
|
|
|
|
implementsStatistics()
|
|
|
|
}
|
|
|
|
|
|
|
|
// ExtractStatistics contains statistics about an extract job.
|
|
|
|
type ExtractStatistics struct {
|
|
|
|
// The number of files per destination URI or URI pattern specified in the
|
|
|
|
// extract configuration. These values will be in the same order as the
|
|
|
|
// URIs specified in the 'destinationUris' field.
|
|
|
|
DestinationURIFileCounts []int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// LoadStatistics contains statistics about a load job.
|
|
|
|
type LoadStatistics struct {
|
|
|
|
// The number of bytes of source data in a load job.
|
|
|
|
InputFileBytes int64
|
|
|
|
|
|
|
|
// The number of source files in a load job.
|
|
|
|
InputFiles int64
|
|
|
|
|
|
|
|
// Size of the loaded data in bytes. Note that while a load job is in the
|
|
|
|
// running state, this value may change.
|
|
|
|
OutputBytes int64
|
|
|
|
|
|
|
|
// The number of rows imported in a load job. Note that while an import job is
|
|
|
|
// in the running state, this value may change.
|
|
|
|
OutputRows int64
|
|
|
|
}
|
|
|
|
|
|
|
|
// QueryStatistics contains statistics about a query job.
|
|
|
|
type QueryStatistics struct {
|
|
|
|
// Billing tier for the job.
|
|
|
|
BillingTier int64
|
|
|
|
|
|
|
|
// Whether the query result was fetched from the query cache.
|
|
|
|
CacheHit bool
|
|
|
|
|
|
|
|
// The type of query statement, if valid.
|
|
|
|
StatementType string
|
|
|
|
|
|
|
|
// Total bytes billed for the job.
|
|
|
|
TotalBytesBilled int64
|
|
|
|
|
|
|
|
// Total bytes processed for the job.
|
|
|
|
TotalBytesProcessed int64
|
|
|
|
|
|
|
|
// Describes execution plan for the query.
|
|
|
|
QueryPlan []*ExplainQueryStage
|
|
|
|
|
|
|
|
// The number of rows affected by a DML statement. Present only for DML
|
|
|
|
// statements INSERT, UPDATE or DELETE.
|
|
|
|
NumDMLAffectedRows int64
|
|
|
|
|
|
|
|
// ReferencedTables: [Output-only, Experimental] Referenced tables for
|
|
|
|
// the job. Queries that reference more than 50 tables will not have a
|
|
|
|
// complete list.
|
|
|
|
ReferencedTables []*Table
|
|
|
|
|
|
|
|
// The schema of the results. Present only for successful dry run of
|
|
|
|
// non-legacy SQL queries.
|
|
|
|
Schema Schema
|
|
|
|
|
|
|
|
// Standard SQL: list of undeclared query parameter names detected during a
|
|
|
|
// dry run validation.
|
|
|
|
UndeclaredQueryParameterNames []string
|
|
|
|
}
|
|
|
|
|
|
|
|
// ExplainQueryStage describes one stage of a query.
|
|
|
|
type ExplainQueryStage struct {
|
|
|
|
// Relative amount of the total time the average shard spent on CPU-bound tasks.
|
|
|
|
ComputeRatioAvg float64
|
|
|
|
|
|
|
|
// Relative amount of the total time the slowest shard spent on CPU-bound tasks.
|
|
|
|
ComputeRatioMax float64
|
|
|
|
|
|
|
|
// Unique ID for stage within plan.
|
|
|
|
ID int64
|
|
|
|
|
|
|
|
// Human-readable name for stage.
|
|
|
|
Name string
|
|
|
|
|
|
|
|
// Relative amount of the total time the average shard spent reading input.
|
|
|
|
ReadRatioAvg float64
|
|
|
|
|
|
|
|
// Relative amount of the total time the slowest shard spent reading input.
|
|
|
|
ReadRatioMax float64
|
|
|
|
|
|
|
|
// Number of records read into the stage.
|
|
|
|
RecordsRead int64
|
|
|
|
|
|
|
|
// Number of records written by the stage.
|
|
|
|
RecordsWritten int64
|
|
|
|
|
|
|
|
// Current status for the stage.
|
|
|
|
Status string
|
|
|
|
|
|
|
|
// List of operations within the stage in dependency order (approximately
|
|
|
|
// chronological).
|
|
|
|
Steps []*ExplainQueryStep
|
|
|
|
|
|
|
|
// Relative amount of the total time the average shard spent waiting to be scheduled.
|
|
|
|
WaitRatioAvg float64
|
|
|
|
|
|
|
|
// Relative amount of the total time the slowest shard spent waiting to be scheduled.
|
|
|
|
WaitRatioMax float64
|
|
|
|
|
|
|
|
// Relative amount of the total time the average shard spent on writing output.
|
|
|
|
WriteRatioAvg float64
|
|
|
|
|
|
|
|
// Relative amount of the total time the slowest shard spent on writing output.
|
|
|
|
WriteRatioMax float64
|
|
|
|
}
|
|
|
|
|
|
|
|
// ExplainQueryStep describes one step of a query stage.
|
|
|
|
type ExplainQueryStep struct {
|
|
|
|
// Machine-readable operation type.
|
|
|
|
Kind string
|
|
|
|
|
|
|
|
// Human-readable stage descriptions.
|
|
|
|
Substeps []string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (*ExtractStatistics) implementsStatistics() {}
|
|
|
|
func (*LoadStatistics) implementsStatistics() {}
|
|
|
|
func (*QueryStatistics) implementsStatistics() {}
|