сделал свое дело. Огромное спасибо.

довалИспользуйте коннектор BigQuery с Spark чтобы успешно получать данные из общедоступного набора данных. Теперь мне нужно получить доступ к набору данных bigquery, который принадлежит одному из наших клиентов и для которого мне был предоставлен файл ключа учетной записи службы (я знаю, что файл ключа учетной записи службы действителен, поскольку я могу использовать его для подключения с использованиемБиблиотека Google BigQuery для Python).

Я следовал совету Игоря ДворжакаВот

Для использования авторизации файла ключа учетной записи службы необходимо установитьmapred.bq.auth.service.account.enable свойство true и указать соединитель BigQuery на файл ключа json учетной записи службы, используяmapred.bq.auth.service.account.json.keyfile свойство

вот так:

from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.appName("SparkSessionBQExample").enableHiveSupport().getOrCreate()

bucket = spark._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = spark._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory =     'gs://{}/hadoop/tmp/bigquery/pyspark_input{}'.format(bucket, datetime.now().strftime("%Y%m%d%H%M%S"))

project_id = 'clientproject'#'publicdata'
dataset_id = 'clientdataset'#samples'
table_id = 'clienttable'#'shakespeare'
conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': project_id,
    'mapred.bq.input.dataset.id': dataset_id,
    'mapred.bq.input.table.id': table_id,
    'mapred.bq.auth.service.account.enable': 'true'
}

# Load data in from BigQuery.
table_data = spark.sparkContext.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

print ('row tally={}'.format(table_data.toDF().count()))

Я разместил файл ключа учетной записи службы в/tmp/keyfile.json на главном узле и на всех рабочих узлах кластера я отправляю свою работу следующим образом:

gcloud dataproc jobs submit pyspark \
    ./bq_pyspark.py  \
    --cluster $CLUSTER \
    --region $REGION \
    --properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json

Я также попробовал:

gcloud dataproc jobs submit pyspark \
    ./bq_pyspark.py  \
    --cluster $CLUSTER \
    --region $REGION \
    --properties=spark.hadoop.mapred.bq.auth.service.account.json.keyfile=/tmp/keyfile.json,spark.hadoop.mapred.bq.auth.service.account.enable=true

Вот соответствующие разделы результатов работы:

Коннектор Bigquery версии 0.10.7-hadoop2
18/11/07 13:36:47 ИНФОРМАЦИЯ com.google.cloud.hadoop.io.bigquery.BigQueryFactory: создание BigQuery из учетных данных по умолчанию.
18/11/07 13:36:47 ИНФОРМАЦИЯ com.google.cloud.hadoop.io.bigquery.BigQueryFactory: создание BigQuery из заданных учетных данных.
18/11/07 13:36:47 ИНФОРМАЦИЯ com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration: использование рабочего пути: 'gs: // dataproc-9e5dc592-1a35-42e6-9dd6-5f9dd9c8df87-europe-west1 / Hadoop / TMP / BigQuery / pyspark_input20181107133646'
Traceback (последний вызов был последним):
Файл "/tmp/b6973a26c76d4069a86806dfbd2d7d0f/bq_pyspark.py", строка 30, в
конф = CONF)
Файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", строка 702, в newAPIHadoopRDD
Файл "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", строка 1133, ввызов
Файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", строка 63, в формате deco
Файл "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", строка 319, в get_return_value
py4j.protocol.Py4JJavaError: Произошла ошибка при вызове z: org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 запрещено
{
«код»: 403,
"ошибки": [{
"домен": "глобальный",
"message": "Доступ запрещен: таблица clientproject: clientdatatset.clienttable: пользователь [email protected] не имеет разрешения bigquery.tables.get для таблицы clientproject: clientdatatset.clienttable.",
"причина": "accessDenied"
}],
"message": "Доступ запрещен: таблица clientproject: clientdatatset.clienttable: пользователь [email protected] не имеет разрешения bigquery.tables.get для таблицы clientproject: clientdatatset.clienttable."
}

Линия

18/11/07 13:36:47 ИНФОРМАЦИЯ com.google.cloud.hadoop.io.bigquery.BigQueryFactory: создание BigQuery из учетных данных по умолчанию.

возможно, предполагает, что я не передаю учетные данные из файла ключа учетной записи службы, поэтому, я думаю, я неправильно понял, что сказал Игорь (или какая-то информация отсутствует).

Если кто-нибудь может сообщить мне, где я иду не так, я буду очень признателен.

ОБНОВЛЕНИЕ ... Я попытался предоставить требуемую конфигурацию аутентификации через код, а не через командную строку:

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': project_id,
    'mapred.bq.input.dataset.id': dataset_id,
    'mapred.bq.input.table.id': table_id,
    'mapred.bq.auth.service.account.enable': 'true',
    'mapred.bq.auth.service.account.keyfile': '/tmp/keyfile.json',
    'mapred.bq.auth.service.account.email': '[email protected]'
}

На этот раз я получил другую ошибку:

18/11/07 16:44:21 ИНФОРМАЦИЯ com.google.cloud.hadoop.io.bigquery.BigQueryFactory: создание BigQuery из учетных данных по умолчанию.
Traceback (последний вызов был последним):
Файл "/tmp/cb5cbb16d59945dd926cab2c1f2f5524/bq_pyspark.py", строка 39, в
конф = CONF)
Файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", строка 702, в newAPIHadoopRDD
Файл "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", строка 1133, ввызов
Файл "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", строка 63, в формате deco
Файл "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", строка 319, в get_return_value py4j.protocol.Py4JJavaError: Произошла ошибка при вызове z: org. apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.io.IOException: toDerInputStream отклоняет тип тега 123
at sun.security.util.DerValue.toDerInputStream (DerValue.java:881)
at sun.security.pkcs12.PKCS12KeyStore.engineLoad (PKCS12KeyStore.java:1939)
в java.security.KeyStore.load (KeyStore.java:1445)
на com.google.api.client.util.SecurityUtils.loadKeyStore (SecurityUtils.java:82)
на com.google.api.client.util.SecurityUtils.loadPrivateKeyFromKeyStore (SecurityUtils.java:115)
на com.google.api.client.googleapis.auth.oauth2.GoogleCredential $ Builder.setServiceAccountPrivateKeyFromP12File (GoogleCredential.java:670)
на com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromPrivateKeyServiceAccount (CredentialFactory.java:251)
на com.google.cloud.hadoop.util.CredentialConfiguration.getCredential (CredentialConfiguration.java:100)
на com.google.cloud.hadoop.io.bigquery.BigQueryFactory.createBigQueryCredential (BigQueryFactory.java:95)
на com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQuery (BigQueryFactory.java:115)
на com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQueryHelper (BigQueryFactory.java:103)

Я погуглил "toDerInputStream отклоняет тип тега 123", что привело меня кtoDerInputStream отклоняет тип тега 123 что предполагает, что мне нужно аутентифицироваться с использованием файла P12. Это согласуется с упоминаниемsun.security.pkcs12.PKCS12KeyStore в стеке вызовов. Следовательно, я думаю, что мне нужен файл P12 (он же файл формата PKCS # 12), а не файл .json, что означает, что мне нужно вернуться к клиенту, чтобы попросить об этом - и из опыта я думаю, что это может занять некоторое время получить файл P12. Я сообщу, если / когда я доберусь куда-нибудь.

ОБНОВЛЕНИЕ 2 ... понял, с помощью Игоря. Я неправильно указалmapred.bq.auth.service.account.keyfileдолжно было бытьmapred.bq.auth.service.account.json.keyfile, Таким образом, соответствующий раздел кода становится:

conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': project_id,
    'mapred.bq.input.dataset.id': dataset_id,
    'mapred.bq.input.table.id': table_id,
    'mapred.bq.auth.service.account.enable': 'true',
    'mapred.bq.auth.service.account.json.keyfile': '/tmp/keyfile.json'
}
table_data = spark.sparkContext.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

и команда отправки просто

gcloud dataproc jobs submit pyspark \
    ./bq_pyspark.py  \
    --cluster $CLUSTER \
    --region $REGION

Теперь он работает, я могу получить доступ к данным в biquery из spark-on-dataproc, аутентификация с использованием файла ключа json учетной записи службы. Спасибо Игорь.

Ответы на вопрос(1)

Ваш ответ на вопрос