Execute código PySpark em blocos de notas do BigQuery Studio

Este documento mostra como executar código PySpark num bloco de notas Python do BigQuery.

Antes de começar

Se ainda não o fez, crie um Google Cloud projeto e um contentor do Cloud Storage.

  1. Configure o seu projeto

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Roles required to select or create a project

      • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
      • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Roles required to enable APIs

      To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

      Enable the APIs

    6. Crie um contentor do Cloud Storage no seu projeto se não tiver um que possa usar.

    7. Configure o seu notebook

    8. Preços

      Para informações sobre preços, consulte os preços de tempo de execução do bloco de notas do BigQuery.

      Abra um notebook Python do BigQuery Studio

      1. Na Google Cloud consola, aceda à página BigQuery.

        Aceda ao BigQuery

      2. Na barra de separadores do painel de detalhes, clique na seta junto ao sinal + e, de seguida, clique em Bloco de notas.

      Crie uma sessão do Spark num notebook do BigQuery Studio

      Pode usar um bloco de notas Python do BigQuery Studio para criar uma sessão interativa do Spark Connect. Cada bloco de notas do BigQuery Studio só pode ter uma sessão do Spark ativa associada.

      Pode criar uma sessão do Spark num bloco de notas Python do BigQuery Studio das seguintes formas:

      • Configure e crie uma única sessão no bloco de notas.
      • Configure uma sessão do Spark num modelo de sessão interativa e, em seguida, use o modelo para configurar e criar uma sessão no bloco de notas. O BigQuery oferece uma funcionalidade Query using Spark que ajuda a começar a programar a sessão baseada em modelos, conforme explicado no separador Sessão do Spark baseada em modelos.

      Sessão única

      Para criar uma sessão do Spark num novo notebook, faça o seguinte:

      1. Na barra de separadores do painel do editor, clique no menu pendente de seta junto ao sinal + e, de seguida, clique em Bloco de notas.

        Captura de ecrã que mostra a interface do BigQuery com o botão "+" para criar um novo notebook.
      2. Copie e execute o seguinte código numa célula do bloco de notas para configurar e criar uma sessão básica do Spark.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Substitua o seguinte:

      • APP_NAME: um nome opcional para a sua sessão.
      • Definições de sessão opcionais: pode adicionar definições da API Dataproc Session para personalizar a sessão. Seguem-se alguns exemplos:
        • RuntimeConfig:
          Ajuda de código que mostra as opções session.runtime.config.
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig:
          Ajuda de código a mostrar opções de configuração de execução de configuração de ambiente de sessão.
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      Sessão do Spark com modelo

      Pode introduzir e executar o código numa célula do bloco de notas para criar uma sessão do Spark com base num modelo de sessão existente. Quaisquer sessiondefinições de configuração que fornecer no código do bloco de notas vão substituir as mesmas definições que estão definidas no modelo de sessão.

      Para começar rapidamente, use o Query using Sparkmodelo para pré-preencher o seu bloco de notas com código de modelo de sessão do Spark:

      1. Na barra de separadores do painel do editor, clique no menu pendente de seta junto ao sinal + e, de seguida, clique em Bloco de notas.
        Captura de ecrã que mostra a interface do BigQuery com o botão "+" para criar um novo notebook.
      2. Em Começar com um modelo, clique em Consultar com o Spark e, de seguida, clique em Usar modelo para inserir o código no seu bloco de notas.
        Seleções da IU do BigQuery para começar com um modelo
      3. Especifique as variáveis conforme explicado nas Notas.
      4. Pode eliminar quaisquer células de código de exemplo adicionais inseridas no bloco de notas.
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Substitua o seguinte:

      • PROJECT: o ID do projeto, que está listado na secção Informações do projeto do Google Cloud painel de controlo da consola.
      • LOCATION: a região do Compute Engine onde a sessão do bloco de notas vai ser executada. Se não for fornecida, a localização predefinida é a região da VM que cria o bloco de notas.
      • SESSION_TEMPLATE: o nome de um modelo de sessão interativa existente. As definições de configuração da sessão são obtidas a partir do modelo. O modelo também tem de especificar as seguintes definições:

        • Versão do tempo de execução 2.3+
        • Tipo de bloco de notas: Spark Connect

          Exemplo:

          Captura de ecrã que mostra as definições necessárias do Spark Connect.
      • APP_NAME: um nome opcional para a sua sessão.

      Escreva e execute código PySpark no seu bloco de notas do BigQuery Studio

      Depois de criar uma sessão do Spark no bloco de notas, use a sessão para executar o código do bloco de notas do Spark no bloco de notas.

      Suporte da API PySpark do Spark Connect: a sessão do bloco de notas do Spark Connect suporta a maioria das APIs PySpark, incluindo DataFrame, Functions e Column, mas não suporta SparkContext e RDD, bem como outras APIs PySpark. Para mais informações, consulte o artigo O que é suportado no Spark 3.5.

      Escritas diretas do bloco de notas do Spark Connect: as sessões do Spark num bloco de notas do BigQuery Studio pré-configuram o conetor do Spark BigQuery para fazer escritas de dados DIRETAS. O método de escrita DIRECT usa a API BigQuery Storage Write, que escreve dados diretamente no BigQuery. O método de escrita INDIRECT, que é o predefinido para batches sem servidor para o Apache Spark, escreve dados num contentor do Cloud Storage intermédio e, em seguida, escreve os dados no BigQuery (para mais informações sobre escritas INDIRECT, consulte Ler e escrever dados a partir do BigQuery e para o BigQuery).

      APIs específicas do Dataproc: o Dataproc simplifica a adição dinâmica de pacotes PyPI à sua sessão do Spark através da extensão do método addArtifacts. Pode especificar a lista no formato version-scheme, (semelhante a pip install). Isto indica ao servidor Spark Connect para instalar pacotes e respetivas dependências em todos os nós do cluster, tornando-os disponíveis para os trabalhadores das suas FDU.

      Exemplo que instala a versão textdistance especificada e as bibliotecas random2 mais recentes compatíveis no cluster para permitir que as UDFs que usam textdistance e random2 sejam executadas em nós de trabalho.

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      Ajuda com o código do bloco de notas: o bloco de notas do BigQuery Studio fornece ajuda com o código quando mantém o ponteiro sobre o nome de uma classe ou de um método, e fornece ajuda com a conclusão do código à medida que introduz código.

      No exemplo seguinte, introduzir DataprocSparkSession. e manter o ponteiro sobre este nome de classe apresenta a conclusão de código e a ajuda da documentação.

      Exemplos de dicas de documentação de código e preenchimento de código.

      Exemplos de PySpark do bloco de notas do BigQuery Studio

      Esta secção fornece exemplos de blocos de notas Python do BigQuery Studio com código PySpark para realizar as seguintes tarefas:

      • Executar uma contagem de palavras num conjunto de dados público de Shakespeare.
      • Crie uma tabela Iceberg com metadados guardados no metastore do BigLake.

      Contagem de palavras

      O exemplo do Pyspark seguinte cria uma sessão do Spark e, em seguida, conta as ocorrências de palavras num conjunto de dados bigquery-public-data.samples.shakespeare público.

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      Substitua o seguinte:

      • APP_NAME: um nome opcional para a sua sessão.

      Saída:

      A saída da célula apresenta um exemplo da saída da contagem de palavras. Para ver os detalhes da sessão na Google Cloud consola, clique no link Vista detalhada da sessão interativa. Para monitorizar a sua sessão do Spark, clique em Ver IU do Spark na página de detalhes da sessão.

      Botão Ver IU do Spark na página de detalhes da sessão na consola
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Tabela de icebergues

      Execute código PySpark para criar uma tabela Iceberg com metadados do metastore do BigLake

      O código de exemplo seguinte cria um sample_iceberg_table com metadados de tabelas armazenados no metastore do BigLake e, em seguida, consulta a tabela.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project_id = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      Notas:

      • PROJECT: o ID do projeto, que está listado na secção Informações do projeto do Google Cloud painel de controlo da consola.
      • REGION e SUBNET_NAME: especifique a região do Compute Engine e o nome de uma sub-rede na região da sessão. O Serverless para Apache Spark ativa o Acesso privado do Google (PGA) na sub-rede especificada.
      • LOCATION: A predefinição BigQuery_metastore_config.location e spark.sql.catalog.{catalog}.gcp_location é US, mas pode escolher qualquer localização do BigQuery suportada.
      • BUCKET e WAREHOUSE_DIRECTORY: o contentor do Cloud Storage e a pasta usados para o diretório do armazém Iceberg.
      • CATALOG_NAME e NAMESPACE: o nome do catálogo e o espaço de nomes do Iceberg combinam-se para identificar a tabela Iceberg (catalog.namespace.table_name).
      • APP_NAME: um nome opcional para a sua sessão.

      O resultado da célula apresenta o sample_iceberg_table com a coluna adicionada e um link para a página Detalhes da sessão interativa na consola Google Cloud . Pode clicar em Ver IU do Spark na página de detalhes da sessão para monitorizar a sua sessão do Spark.

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      Veja os detalhes da tabela no BigQuery

      Siga estes passos para verificar os detalhes da tabela Iceberg no BigQuery:

      1. Na Google Cloud consola, aceda à página BigQuery.

        Aceda ao BigQuery

      2. No painel de recursos do projeto, clique no seu projeto e, de seguida, clique no seu espaço de nomes para listar a tabela sample_iceberg_table. Clique na tabela Detalhes para ver as informações de Configuração da tabela do catálogo aberto.

        Os formatos de entrada e saída são os formatos de classe padrão do Hadoop InputFormat e OutputFormat que o Iceberg usa.

        Metadados da tabela Iceberg apresentados na IU do BigQuery

      Outros exemplos

      Crie um Spark DataFrame (sdf) a partir de um Pandas DataFrame (df).

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Execute agregações no Spark DataFrames.

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Leia a partir do BigQuery através do conetor Spark-BigQuery.

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Escreva código Spark com o Gemini Code Assist

      Pode pedir ao Gemini Code Assist para gerar código PySpark no seu bloco de notas. O Gemini Code Assist obtém e usa tabelas relevantes do BigQuery e do Dataproc Metastore, bem como os respetivos esquemas, para gerar uma resposta de código.

      Para gerar código do Gemini Code Assist no seu bloco de notas, faça o seguinte:

      1. Insira uma nova célula de código clicando em + Código na barra de ferramentas. A nova célula de código apresenta Start coding or generate with AI. Clique em gerar.

      2. No editor de geração, introduza um comando de linguagem natural e, de seguida, clique em enter. Certifique-se de que inclui a palavra-chave spark ou pyspark no comando.

        Exemplo de comando:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        Exemplo de saída:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Sugestões para a geração de código do Gemini Code Assist

      • Para permitir que o Gemini Code Assist obtenha tabelas e esquemas relevantes, ative a sincronização do Data Catalog para instâncias do Dataproc Metastore.

      • Certifique-se de que a sua conta de utilizador tem acesso ao Data Catalog às tabelas de consulta. Para isso, atribua a funçãoDataCatalog.Viewer.

      Termine a sessão do Spark

      Pode realizar qualquer uma das seguintes ações para parar a sessão do Spark Connect no bloco de notas do BigQuery Studio:

      • Execute spark.stop() numa célula do bloco de notas.
      • Termine o tempo de execução no bloco de notas:
        1. Clique no seletor de tempo de execução e, de seguida, clique em Gerir sessões.
          Faça a gestão da seleção de sessões
        2. Na caixa de diálogo Sessões ativas, clique no ícone de encerramento e, de seguida, clique em Encerrar.
          Terminar seleção de sessão na caixa de diálogo Sessões ativas

      Orquestre o código do bloco de notas do BigQuery Studio

      Pode orquestrar o código do bloco de notas do BigQuery Studio das seguintes formas:

      Agende código do bloco de notas a partir da Google Cloud consola

      Pode agendar código de blocos de notas das seguintes formas:

      • Agende o notebook.
      • Se a execução de código do bloco de notas fizer parte de um fluxo de trabalho, agende o bloco de notas como parte de um pipeline.

      Execute código de bloco de notas como uma carga de trabalho em lote

      Conclua os passos seguintes para executar o código do bloco de notas do BigQuery Studio como uma carga de trabalho em lote.

      1. Transfira o código do bloco de notas para um ficheiro num terminal local ou no Cloud Shell.

        1. Abra o bloco de notas no painel Explorador na página BigQuery Studio na Google Cloud consola.

        2. Transfira o código do bloco de notas selecionando Transferir no menu Ficheiro e, de seguida, escolha Download .py.

          Menu Ficheiro > Transferir na página do explorador.
      2. Gere requirements.txt.

        1. Instale o pipreqs no diretório onde guardou o ficheiro .py.
          pip install pipreqs
          
        2. Execute pipreqs para gerar requirements.txt.

          pipreqs filename.py
          

        3. Use a CLI do Google Cloud para copiar o ficheiro requirements.txt local para um contentor no Cloud Storage.

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. Atualize o código da sessão do Spark editando o ficheiro .py transferido.

        1. Remova ou comente todos os comandos de script de shell.

        2. Remova o código que configura a sessão do Spark e, em seguida, especifique os parâmetros de configuração como parâmetros de envio da carga de trabalho em lote. (consulte o artigo Envie uma carga de trabalho em lote do Spark).

          Exemplo:

          • Remova a seguinte linha de configuração da sub-rede da sessão do código:

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • Quando executar a carga de trabalho em lote, use a flag --subnet para especificar a sub-rede.

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. Use um fragmento do código de criação de sessão simples.

          • Exemplo de código do bloco de notas transferido antes da simplificação.

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • Código de carga de trabalho em lote após a simplificação.

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. Execute a carga de trabalho em lote.

        1. Consulte o artigo Envie a carga de trabalho em lote do Spark para ver instruções.

          • Certifique-se de que inclui a flag --deps-bucket para apontar para o contentor do Cloud Storage que contém o ficheiro Your requirements.txt.

            Exemplo:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          Notas:

          • FILENAME: o nome do ficheiro de código do bloco de notas transferido e editado.
          • REGION: a região do Compute Engine onde o cluster está localizado.
          • BUCKET O nome do contentor do Cloud Storage que contém o seu ficheiro requirements.txt.
          • --version: spark runtime version 2.3 está selecionado para executar a carga de trabalho em lote.
      5. Confirme o seu código.

        1. Depois de testar o código da carga de trabalho em lote, pode consolidar o ficheiro .ipynb ou .py no seu repositório através do cliente git, como o GitHub, o GitLab ou o Bitbucket, como parte do pipeline de CI/CD.
      6. Agende a sua carga de trabalho em lote com o Cloud Composer.

        1. Consulte o artigo Execute cargas de trabalho sem servidor para Apache Spark com o Cloud Composer para ver instruções.

      Resolva problemas de erros do bloco de notas

      Se ocorrer uma falha numa célula que contenha código Spark, pode resolver o problema do erro clicando no link Vista detalhada da sessão interativa no resultado da célula (consulte os exemplos de Wordcount e tabela Iceberg).

      Problemas conhecidos e soluções

      Erro: um tempo de execução do bloco de notas criado com a versão 3.10 do Python pode causar um erro PYTHON_VERSION_MISMATCH quando tenta estabelecer ligação à sessão do Spark.

      Solução: recrie o tempo de execução com a versão 3.11 do Python.

      O que se segue?