Execute código PySpark em blocos de notas do BigQuery Studio
Este documento mostra como executar código PySpark num bloco de notas Python do BigQuery.
Antes de começar
Se ainda não o fez, crie um Google Cloud projeto e um contentor do Cloud Storage.
Configure o seu projeto
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. -
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles. Crie um contentor do Cloud Storage no seu projeto se não tiver um que possa usar.
Configure o seu notebook
- Credenciais do bloco de notas: por predefinição, a sessão do bloco de notas usa as suas credenciais de utilizador. Em alternativa, pode usar credenciais da conta de serviço da sessão.
- Credenciais do utilizador: a sua conta de utilizador tem de ter as seguintes funções de gestão de identidades e acessos:
- Editor do Dataproc (função
roles/dataproc.editor
) - Utilizador do BigQuery Studio (função
roles/bigquery.studioUser
) - Função de utilizador da conta de serviço (roles/iam.serviceAccountUser)
na conta de serviço da sessão.
Esta função contém a autorização
iam.serviceAccounts.actAs
necessária para usar a identidade da conta de serviço.
- Editor do Dataproc (função
- Credenciais da conta de serviço: se quiser especificar credenciais da conta de serviço em vez de credenciais do utilizador para a sessão do bloco de notas, a conta de serviço da sessão tem de ter a seguinte função:
- Credenciais do utilizador: a sua conta de utilizador tem de ter as seguintes funções de gestão de identidades e acessos:
- Tempo de execução do bloco de notas: o seu bloco de notas usa um tempo de execução do Vertex AI predefinido, a menos que selecione um tempo de execução diferente. Se quiser definir o seu próprio tempo de execução, crie o tempo de execução a partir da página Tempos de execução na Google Cloud consola. Tenha em atenção que, quando usar a biblioteca NumPy, use a versão 1.26 do NumPy, que é suportada pelo Spark 3.5, no tempo de execução do bloco de notas.
- Credenciais do bloco de notas: por predefinição, a sessão do bloco de notas usa as suas credenciais de utilizador. Em alternativa, pode usar credenciais da conta de serviço da sessão.
Na Google Cloud consola, aceda à página BigQuery.
Na barra de separadores do painel de detalhes, clique na seta junto ao sinal + e, de seguida, clique em Bloco de notas.
- Configure e crie uma única sessão no bloco de notas.
- Configure uma sessão do Spark num
modelo de sessão interativa e, em seguida, use o modelo para configurar e criar uma sessão no bloco de notas.
O BigQuery oferece uma funcionalidade
Query using Spark
que ajuda a começar a programar a sessão baseada em modelos, conforme explicado no separador Sessão do Spark baseada em modelos. Na barra de separadores do painel do editor, clique no menu pendente de seta junto ao sinal + e, de seguida, clique em Bloco de notas.
Copie e execute o seguinte código numa célula do bloco de notas para configurar e criar uma sessão básica do Spark.
- APP_NAME: um nome opcional para a sua sessão.
- Definições de sessão opcionais: pode adicionar definições da API Dataproc
Session
para personalizar a sessão. Seguem-se alguns exemplos:RuntimeConfig
:session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
session.runtime_config.container_image = path/to/container/image
EnvironmentConfig
:- session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
session.environment_config.execution_config.ttl = {"seconds": VALUE}
session.environment_config.execution_config.service_account = SERVICE_ACCOUNT
- Na barra de separadores do painel do editor, clique no menu pendente de seta junto ao sinal + e, de seguida, clique em Bloco de notas.
- Em Começar com um modelo, clique em Consultar com o Spark e, de seguida, clique em
Usar modelo para inserir o código no seu bloco de notas.
- Especifique as variáveis conforme explicado nas Notas.
- Pode eliminar quaisquer células de código de exemplo adicionais inseridas no bloco de notas.
- PROJECT: o ID do projeto, que está listado na secção Informações do projeto do Google Cloud painel de controlo da consola.
- LOCATION: a região do Compute Engine onde a sessão do bloco de notas vai ser executada. Se não for fornecida, a localização predefinida é a região da VM que cria o bloco de notas.
SESSION_TEMPLATE: o nome de um modelo de sessão interativa existente. As definições de configuração da sessão são obtidas a partir do modelo. O modelo também tem de especificar as seguintes definições:
- Versão do tempo de execução
2.3
+ Tipo de bloco de notas:
Spark Connect
Exemplo:
- Versão do tempo de execução
APP_NAME: um nome opcional para a sua sessão.
- Executar uma contagem de palavras num conjunto de dados público de Shakespeare.
- Crie uma tabela Iceberg com metadados guardados no metastore do BigLake.
- APP_NAME: um nome opcional para a sua sessão.
- PROJECT: o ID do projeto, que está listado na secção Informações do projeto do Google Cloud painel de controlo da consola.
- REGION e SUBNET_NAME: especifique a região do Compute Engine e o nome de uma sub-rede na região da sessão. O Serverless para Apache Spark ativa o Acesso privado do Google (PGA) na sub-rede especificada.
- LOCATION: A predefinição
BigQuery_metastore_config.location
espark.sql.catalog.{catalog}.gcp_location
éUS
, mas pode escolher qualquer localização do BigQuery suportada. - BUCKET e WAREHOUSE_DIRECTORY: o contentor do Cloud Storage e a pasta usados para o diretório do armazém Iceberg.
- CATALOG_NAME e NAMESPACE: o nome do catálogo e o espaço de nomes do Iceberg combinam-se para identificar a tabela Iceberg (
catalog.namespace.table_name
). - APP_NAME: um nome opcional para a sua sessão.
Na Google Cloud consola, aceda à página BigQuery.
No painel de recursos do projeto, clique no seu projeto e, de seguida, clique no seu espaço de nomes para listar a tabela
sample_iceberg_table
. Clique na tabela Detalhes para ver as informações de Configuração da tabela do catálogo aberto.Os formatos de entrada e saída são os formatos de classe padrão do Hadoop
InputFormat
eOutputFormat
que o Iceberg usa.Insira uma nova célula de código clicando em + Código na barra de ferramentas. A nova célula de código apresenta
Start coding or generate with AI
. Clique em gerar.No editor de geração, introduza um comando de linguagem natural e, de seguida, clique em
enter
. Certifique-se de que inclui a palavra-chavespark
oupyspark
no comando.Exemplo de comando:
create a spark dataframe from order_items and filter to orders created in 2024
Exemplo de saída:
spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items") df = spark.sql("SELECT * FROM order_items")
Para permitir que o Gemini Code Assist obtenha tabelas e esquemas relevantes, ative a sincronização do Data Catalog para instâncias do Dataproc Metastore.
Certifique-se de que a sua conta de utilizador tem acesso ao Data Catalog às tabelas de consulta. Para isso, atribua a função
DataCatalog.Viewer
.- Execute
spark.stop()
numa célula do bloco de notas. - Termine o tempo de execução no bloco de notas:
- Clique no seletor de tempo de execução e, de seguida, clique em Gerir sessões.
- Na caixa de diálogo Sessões ativas, clique no ícone de encerramento e, de seguida, clique em Encerrar.
- Clique no seletor de tempo de execução e, de seguida, clique em Gerir sessões.
Agende código de blocos de notas a partir da Google Cloud consola (aplicam-se os preços dos blocos de notas).
Execute código de bloco de notas como uma carga de trabalho em lote (aplicam-se os preços do Serverless para Apache Spark).
- Agende o notebook.
- Se a execução de código do bloco de notas fizer parte de um fluxo de trabalho, agende o bloco de notas como parte de um pipeline.
Transfira o código do bloco de notas para um ficheiro num terminal local ou no Cloud Shell.
Abra o bloco de notas no painel Explorador na página BigQuery Studio na Google Cloud consola.
Transfira o código do bloco de notas selecionando Transferir no menu Ficheiro e, de seguida, escolha
Download .py
.
Gere
requirements.txt
.- Instale o
pipreqs
no diretório onde guardou o ficheiro.py
.pip install pipreqs
Execute
pipreqs
para gerarrequirements.txt
.pipreqs filename.py
Use a CLI do Google Cloud para copiar o ficheiro
requirements.txt
local para um contentor no Cloud Storage.gcloud storage cp requirements.txt gs://BUCKET/
- Instale o
Atualize o código da sessão do Spark editando o ficheiro
.py
transferido.Remova ou comente todos os comandos de script de shell.
Remova o código que configura a sessão do Spark e, em seguida, especifique os parâmetros de configuração como parâmetros de envio da carga de trabalho em lote. (consulte o artigo Envie uma carga de trabalho em lote do Spark).
Exemplo:
Remova a seguinte linha de configuração da sub-rede da sessão do código:
session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
Quando executar a carga de trabalho em lote, use a flag
--subnet
para especificar a sub-rede.gcloud dataproc batches submit pyspark \ --subnet=SUBNET_NAME
Use um fragmento do código de criação de sessão simples.
Exemplo de código do bloco de notas transferido antes da simplificação.
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session
session = Session() spark = DataprocSparkSession \ .builder \ .appName("CustomSparkSession") .dataprocSessionConfig(session) \ .getOrCreate()
Código de carga de trabalho em lote após a simplificação.
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .getOrCreate()
Execute a carga de trabalho em lote.
Consulte o artigo Envie a carga de trabalho em lote do Spark para ver instruções.
Certifique-se de que inclui a flag --deps-bucket para apontar para o contentor do Cloud Storage que contém o ficheiro Your
requirements.txt
.Exemplo:
gcloud dataproc batches submit pyspark FILENAME.py \ --region=REGION \ --deps-bucket=BUCKET \ --version=2.3
Notas:
- FILENAME: o nome do ficheiro de código do bloco de notas transferido e editado.
- REGION: a região do Compute Engine onde o cluster está localizado.
- BUCKET O nome do contentor do Cloud Storage
que contém o seu ficheiro
requirements.txt
. --version
: spark runtime version 2.3 está selecionado para executar a carga de trabalho em lote.
Confirme o seu código.
- Depois de testar o código da carga de trabalho em lote, pode consolidar o ficheiro
.ipynb
ou.py
no seu repositório através do clientegit
, como o GitHub, o GitLab ou o Bitbucket, como parte do pipeline de CI/CD.
- Depois de testar o código da carga de trabalho em lote, pode consolidar o ficheiro
Agende a sua carga de trabalho em lote com o Cloud Composer.
- Consulte o artigo Execute cargas de trabalho sem servidor para Apache Spark com o Cloud Composer para ver instruções.
- Vídeo de demonstração do YouTube: Liberte o poder do Apache Spark integrado com o BigQuery.
- Use o metastore do BigLake com o Dataproc
- Use o metastore do BigLake com o Serverless para Apache Spark
Preços
Para informações sobre preços, consulte os preços de tempo de execução do bloco de notas do BigQuery.
Abra um notebook Python do BigQuery Studio
Crie uma sessão do Spark num notebook do BigQuery Studio
Pode usar um bloco de notas Python do BigQuery Studio para criar uma sessão interativa do Spark Connect. Cada bloco de notas do BigQuery Studio só pode ter uma sessão do Spark ativa associada.
Pode criar uma sessão do Spark num bloco de notas Python do BigQuery Studio das seguintes formas:
Sessão única
Para criar uma sessão do Spark num novo notebook, faça o seguinte:
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Substitua o seguinte:
Sessão do Spark com modelo
Pode introduzir e executar o código numa célula do bloco de notas para criar uma sessão do Spark com base num modelo de sessão existente. Quaisquer
session
definições de configuração que fornecer no código do bloco de notas vão substituir as mesmas definições que estão definidas no modelo de sessão.Para começar rapidamente, use o
Query using Spark
modelo para pré-preencher o seu bloco de notas com código de modelo de sessão do Spark:from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.functions as f session = Session() # Configure the session with an existing session template. session_template = "SESSION_TEMPLATE" session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}" # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Escreva e execute código PySpark no seu bloco de notas do BigQuery Studio
Depois de criar uma sessão do Spark no bloco de notas, use a sessão para executar o código do bloco de notas do Spark no bloco de notas.
Suporte da API PySpark do Spark Connect: a sessão do bloco de notas do Spark Connect suporta a maioria das APIs PySpark, incluindo DataFrame, Functions e Column, mas não suporta SparkContext e RDD, bem como outras APIs PySpark. Para mais informações, consulte o artigo O que é suportado no Spark 3.5.
Escritas diretas do bloco de notas do Spark Connect: as sessões do Spark num bloco de notas do BigQuery Studio pré-configuram o conetor do Spark BigQuery para fazer escritas de dados DIRETAS. O método de escrita DIRECT usa a API BigQuery Storage Write, que escreve dados diretamente no BigQuery. O método de escrita INDIRECT, que é o predefinido para batches sem servidor para o Apache Spark, escreve dados num contentor do Cloud Storage intermédio e, em seguida, escreve os dados no BigQuery (para mais informações sobre escritas INDIRECT, consulte Ler e escrever dados a partir do BigQuery e para o BigQuery).
APIs específicas do Dataproc: o Dataproc simplifica a adição dinâmica de pacotes
PyPI
à sua sessão do Spark através da extensão do métodoaddArtifacts
. Pode especificar a lista no formatoversion-scheme
, (semelhante apip install
). Isto indica ao servidor Spark Connect para instalar pacotes e respetivas dependências em todos os nós do cluster, tornando-os disponíveis para os trabalhadores das suas FDU.Exemplo que instala a versão
textdistance
especificada e as bibliotecasrandom2
mais recentes compatíveis no cluster para permitir que as UDFs que usamtextdistance
erandom2
sejam executadas em nós de trabalho.spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
Ajuda com o código do bloco de notas: o bloco de notas do BigQuery Studio fornece ajuda com o código quando mantém o ponteiro sobre o nome de uma classe ou de um método, e fornece ajuda com a conclusão do código à medida que introduz código.
No exemplo seguinte, introduzir
DataprocSparkSession
. e manter o ponteiro sobre este nome de classe apresenta a conclusão de código e a ajuda da documentação.Exemplos de PySpark do bloco de notas do BigQuery Studio
Esta secção fornece exemplos de blocos de notas Python do BigQuery Studio com código PySpark para realizar as seguintes tarefas:
Contagem de palavras
O exemplo do Pyspark seguinte cria uma sessão do Spark e, em seguida, conta as ocorrências de palavras num conjunto de dados
bigquery-public-data.samples.shakespeare
público.# Basic wordcount example from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Run a wordcount on the public Shakespeare dataset. df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load() words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word")) word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word") word_counts_df.show()
Substitua o seguinte:
Saída:
A saída da célula apresenta um exemplo da saída da contagem de palavras. Para ver os detalhes da sessão na Google Cloud consola, clique no link Vista detalhada da sessão interativa. Para monitorizar a sua sessão do Spark, clique em Ver IU do Spark na página de detalhes da sessão.
Interactive Session Detail View: LINK +------------+-----+ | word|count| +------------+-----+ | '| 42| | ''All| 1| | ''Among| 1| | ''And| 1| | ''But| 1| | ''Gamut'| 1| | ''How| 1| | ''Lo| 1| | ''Look| 1| | ''My| 1| | ''Now| 1| | ''O| 1| | ''Od's| 1| | ''The| 1| | ''Tis| 4| | ''When| 1| | ''tis| 1| | ''twas| 1| | 'A| 10| |'ARTEMIDORUS| 1| +------------+-----+ only showing top 20 rows
Tabela de icebergues
Execute código PySpark para criar uma tabela Iceberg com metadados do metastore do BigLake
O código de exemplo seguinte cria um
sample_iceberg_table
com metadados de tabelas armazenados no metastore do BigLake e, em seguida, consulta a tabela.from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.functions as f # Create the Dataproc Serverless session. session = Session() # Set the session configuration for BigLake Metastore with the Iceberg environment. project_id = "PROJECT" region = "REGION" subnet_name = "SUBNET_NAME" location = "LOCATION" session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY" catalog = "CATALOG_NAME" namespace = "NAMESPACE" session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}" # Create the Spark Connect session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Create the namespace in BigQuery. spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;") # Create the Iceberg table. spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`"); spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE sample_iceberg_table;") # Insert table data and query the table. spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");") # Alter table, then query and display table data and schema. spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE sample_iceberg_table;") df = spark.sql("SELECT * FROM sample_iceberg_table") df.show() df.printSchema()
Notas:
O resultado da célula apresenta o
sample_iceberg_table
com a coluna adicionada e um link para a página Detalhes da sessão interativa na consola Google Cloud . Pode clicar em Ver IU do Spark na página de detalhes da sessão para monitorizar a sua sessão do Spark.Interactive Session Detail View: LINK +---+---------+------------+ | id| data|newDoubleCol| +---+---------+------------+ | 1|first row| NULL| +---+---------+------------+ root |-- id: integer (nullable = true) |-- data: string (nullable = true) |-- newDoubleCol: double (nullable = true)
Veja os detalhes da tabela no BigQuery
Siga estes passos para verificar os detalhes da tabela Iceberg no BigQuery:
Outros exemplos
Crie um Spark
DataFrame
(sdf
) a partir de um Pandas DataFrame (df
).sdf = spark.createDataFrame(df) sdf.show()
Execute agregações no Spark
DataFrames
.from pyspark.sql import functions as F sdf.groupby("segment").agg( F.mean("total_spend_per_user").alias("avg_order_value"), F.approx_count_distinct("user_id").alias("unique_customers") ).show()
Leia a partir do BigQuery através do conetor Spark-BigQuery.
spark.conf.set("viewsEnabled","true") spark.conf.set("materializationDataset","my-bigquery-dataset") sdf = spark.read.format('bigquery') \ .load(query)
Escreva código Spark com o Gemini Code Assist
Pode pedir ao Gemini Code Assist para gerar código PySpark no seu bloco de notas. O Gemini Code Assist obtém e usa tabelas relevantes do BigQuery e do Dataproc Metastore, bem como os respetivos esquemas, para gerar uma resposta de código.
Para gerar código do Gemini Code Assist no seu bloco de notas, faça o seguinte:
Sugestões para a geração de código do Gemini Code Assist
Termine a sessão do Spark
Pode realizar qualquer uma das seguintes ações para parar a sessão do Spark Connect no bloco de notas do BigQuery Studio:
Orquestre o código do bloco de notas do BigQuery Studio
Pode orquestrar o código do bloco de notas do BigQuery Studio das seguintes formas:
Agende código do bloco de notas a partir da Google Cloud consola
Pode agendar código de blocos de notas das seguintes formas:
Execute código de bloco de notas como uma carga de trabalho em lote
Conclua os passos seguintes para executar o código do bloco de notas do BigQuery Studio como uma carga de trabalho em lote.
Resolva problemas de erros do bloco de notas
Se ocorrer uma falha numa célula que contenha código Spark, pode resolver o problema do erro clicando no link Vista detalhada da sessão interativa no resultado da célula (consulte os exemplos de Wordcount e tabela Iceberg).
Problemas conhecidos e soluções
Erro: um tempo de execução do bloco de notas criado com a versão
3.10
do Python pode causar um erroPYTHON_VERSION_MISMATCH
quando tenta estabelecer ligação à sessão do Spark.Solução: recrie o tempo de execução com a versão
3.11
do Python.O que se segue?