Я объединил сырье со схемой
вот мой код:
defmodule Google.BigQuery.Utils do
@moduledoc false
@empty [nil, %{}, []]
def merge_rows_with_schema(nil, nil), do: nil
def merge_rows_with_schema(%GoogleApi.BigQuery.V2.Model.TableSchema{} = _schema, nil), do: nil
def merge_rows_with_schema(%GoogleApi.BigQuery.V2.Model.TableSchema{} = schema, rows) do
rows |> struct_to_map |> Enum.map(&merge_rows_with_schema_(schema.fields, &1["f"]))
end
defp merge_rows_with_schema_(_schema, fields) when fields in @empty, do: nil
defp merge_rows_with_schema_(schema, fields) do
fields
|> Stream.with_index
|> Enum.reduce([], fn ({field, i}, acc) -> [merge_row(Enum.at(schema, i), field)] ++ acc end)
|> Enum.into(%{})
end
def merge_row(schema_field, field) do
converted_val = convert(schema_field.mode, schema_field.type, schema_field, field)
{schema_field.name, converted_val}
end
def convert(_mode, _type, _schema, value) when value in @empty, do: nil
def convert("REPEATED", type, schema, field), do:
field["v"] |> Enum.map(&convert(nil, type, schema, &1))
def convert(_mode, "RECORD", schema, field), do:
merge_rows_with_schema_(schema.fields, field["v"]["f"])
def convert(_mode, _type, schema, field), do:
convert_primtive(schema.type, field["v"])
def convert_primtive(_type, value) when value in @empty, do: nil
def convert_primtive("STRING", value), do: value
def convert_primtive("BOOLEAN", value), do: value == "true"
def convert_primtive("FLOAT", value), do: String.to_float(value)
def convert_primtive("INTEGER", value), do: String.to_integer(value)
def convert_primtive("TIMESTAMP", value) do
String.to_float(value) * 1000
|> trunc
|> DateTime.from_unix!(:millisecond)
|> to_string
end
def struct_to_map(struct), do:
struct |> Poison.encode!() |> Poison.decode!()
end
, используя его:
defmodule Google.BigQuery do
alias Google.Connection # import the make_conn()
alias GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse
alias GoogleApi.BigQuery.V2.Model.QueryResponse
@timeout 40_000
@doc """
Runs a BigQuery SQL query synchronously and returns query results in map structure
## Example
project = "bigquery-public-data"
query_str = "SELECT * FROM `usa_names.usa_1910_2013` WHERE name = 'Yoshiro'"
Google.BigQuery.query(project, query_str, "false")
#=> [
%{"state" => "CA","gender" => "M","year" => "1922","name" => "Yoshiro","number" => "5"}
]
"""
@spec query(project, String.t, boolean()) :: map
def query(project, query_string, use_legacy_sql? \\ true) do
query_request = %GoogleApi.BigQuery.V2.Model.QueryRequest{
query: query_string,
useLegacySql: use_legacy_sql?,
timeoutMs: @timeout
}
GoogleApi.BigQuery.V2.Api.Jobs.bigquery_jobs_query(Connection.get(), project, [body: query_request])
|> process_query_reponse()
end
defp process_query_reponse({:error, %Tesla.Env{body: body} }), do: {:error, body}
defp process_query_reponse({:error, reason }), do: {:error, reason}
defp process_query_reponse({:ok, %QueryResponse{rows: rows, schema: schema}}) do
{:ok, Google.BigQuery.Utils.merge_rows_with_schema(schema, rows)}
end
end
добавив также мой модуль подключения:
defmodule Google.Connection do
@doc """
Create a tesla connection to make an API request if doesn't exist, and if exist,
it brings the last buffered connection.
It ALREADY has a token server in Goth (google authentication) library
"""
def get() do
{:ok, token} = Goth.Token.for_scope("https://www.googleapis.com/auth/cloud-platform")
GoogleApi.Storage.V1.Connection.new(token.token)
end
end
испытания:
defmodule Google.BigQuery.UtilsTest do
use ExUnit.Case
alias Google.BigQuery.Utils, as: BigQueryUtils
describe "merge_rows_with_schema" do
test "data types" do
schema =
%GoogleApi.BigQuery.V2.Model.TableSchema{
fields: [
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: [
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "c",
type: "INTEGER"
},
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "b",
type: "INTEGER"
}
],
mode: "NULLABLE",
name: "a",
type: "RECORD"
},
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: [
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "age",
type: "INTEGER"
},
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "name",
type: "STRING"
}
],
mode: "REPEATED",
name: "det",
type: "RECORD"
},
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "upload_time",
type: "TIMESTAMP"
},
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "corr_id",
type: "INTEGER"
}
]
}
row =
[
%GoogleApi.BigQuery.V2.Model.TableRow{
f: [
%GoogleApi.BigQuery.V2.Model.TableCell{
v: %{"f" => [%{"v" => "3"}, %{"v" => "1"}]}
},
%GoogleApi.BigQuery.V2.Model.TableCell{
v: [
%{"v" => %{"f" => [%{"v" => "23"}, %{"v" => "bob"}]}},
%{"v" => %{"f" => [%{"v" => "21"}, %{"v" => "jim"}]}}
]
},
%GoogleApi.BigQuery.V2.Model.TableCell{v: "1.528028549735E9"},
%GoogleApi.BigQuery.V2.Model.TableCell{v: "1234"}
]
}
]
expected_row = [
%{
"a" => %{"b" => 1, "c" => 3},
"corr_id" => 1234,
"det" => [
%{"age" => 23, "name" => "bob"},
%{"age" => 21, "name" => "jim"}
],
"upload_time" => "2018-06-03 12:22:29.735Z"
}
]
assert BigQueryUtils.merge_rows_with_schema(schema, row) == expected_row
end
test "array with primitive types" do
row = [
%GoogleApi.BigQuery.V2.Model.TableRow{
f: [
%GoogleApi.BigQuery.V2.Model.TableCell{v: [%{"v" => "1"}, %{"v" => "2"}]},
%GoogleApi.BigQuery.V2.Model.TableCell{v: "22222"}
]
}
]
schema =
%GoogleApi.BigQuery.V2.Model.TableSchema{
fields: [
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "REPEATED",
name: "a",
type: "INTEGER"
},
%GoogleApi.BigQuery.V2.Model.TableFieldSchema{
description: nil,
fields: nil,
mode: "NULLABLE",
name: "corr_id",
type: "INTEGER"
}
]
}
expected_row = [%{"a" => [1, 2], "corr_id" => 22222}]
assert BigQueryUtils.merge_rows_with_schema(schema, row) == expected_row
end
end
test "convert timestamp" do
unix_timestamp = "1.528028549735E9"
assert BigQueryUtils.convert_primtive("TIMESTAMP", unix_timestamp) == "2018-06-03 12:22:29.735Z"
end
end