Componente Flink opcional do Dataproc

Pode ativar componentes adicionais, como o Flink, quando cria um cluster do Dataproc através da funcionalidade Componentes opcionais. Esta página mostra como criar um cluster do Dataproc com o componente opcional Apache Flink ativado (um cluster do Flink) e, em seguida, executar tarefas do Flink no cluster.

Pode usar o cluster do Flink para:

  1. Execute tarefas do Flink através do recurso Jobs do Dataproc a partir da Google Cloud consola, da Google Cloud CLI ou da API Dataproc.

  2. Execute tarefas do Flink com a CLI flink em execução no nó principal do cluster do Flink.

  3. Execute tarefas do Apache Beam no Flink

  4. Execute o Flink num cluster Kerberized

Pode usar a Google Cloud consola, a Google Cloud CLI ou a API Dataproc para criar um cluster do Dataproc com o componente Flink ativado no cluster.

Recomendação: use um cluster de VMs padrão de 1 mestre com o componente Flink. Os clusters do modo de alta disponibilidade do Dataproc (com 3 VMs principais) não suportam o modo de alta disponibilidade do Flink.

Pode executar tarefas do Flink através do recurso Jobs do Dataproc a partir da Google Cloud consola, da CLI do Google Cloud ou da API Dataproc.

Consola

Para enviar uma tarefa de contagem de palavras do Flink de exemplo a partir da consola:

  1. Abra a página Dataproc Enviar uma tarefa na Google Cloud consola no seu navegador.

  2. Preencha os campos na página Enviar um trabalho:

    1. Selecione o nome do cluster na lista de clusters.
    2. Defina o Tipo de serviço como Flink.
    3. Defina Main class or jar como org.apache.flink.examples.java.wordcount.WordCount.
    4. Defina Ficheiros JAR como file:///usr/lib/flink/examples/batch/WordCount.jar.
      • file:/// denota um ficheiro localizado no cluster. O Dataproc instalou o WordCount.jar quando criou o cluster do Flink.
      • Este campo também aceita um caminho do Cloud Storage (gs://BUCKET/JARFILE) ou um caminho do Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR).
  3. Clique em Enviar.

    • O resultado do controlador de tarefas é apresentado na página Detalhes da tarefa.
    • As tarefas do Flink são apresentadas na página Tarefas do Dataproc na Google Cloud consola.
    • Clique em Parar ou Eliminar na página Tarefas ou Detalhes da tarefa para parar ou eliminar uma tarefa.

gcloud

Para enviar uma tarefa Flink para um cluster Flink do Dataproc, execute o comando gcloud dataproc jobs submit localmente numa janela de terminal ou no Cloud Shell.

gcloud dataproc jobs submit flink \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --class=MAIN_CLASS \
    --jar=JAR_FILE \
    -- JOB_ARGS

Notas:

  • CLUSTER_NAME: Especifique o nome do cluster do Dataproc Flink ao qual enviar a tarefa.
  • REGION: especifique uma região do Compute Engine onde o cluster está localizado.
  • MAIN_CLASS: especifique a classe main da sua aplicação Flink, como:
    • org.apache.flink.examples.java.wordcount.WordCount
  • JAR_FILE: especifique o ficheiro JAR da aplicação Flink. Pode especificar:
    • Um ficheiro JAR instalado no cluster, usando o prefixo file:///` :
      • file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
      • file:///usr/lib/flink/examples/batch/WordCount.jar
    • Um ficheiro JAR no Cloud Storage: gs://BUCKET/JARFILE
    • Um ficheiro JAR no HDFS: hdfs://PATH_TO_JAR
  • JOB_ARGS: opcionalmente, adicione argumentos de tarefas após o duplo traço (--).

  • Depois de enviar a tarefa, o resultado do controlador de tarefas é apresentado no terminal local ou do Cloud Shell.

    Program execution finished
    Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished.
    Job Runtime: 13610 ms
    ...
    (after,1)
    (and,12)
    (arrows,1)
    (ay,1)
    (be,4)
    (bourn,1)
    (cast,1)
    (coil,1)
    (come,1)

REST

Esta secção mostra como enviar uma tarefa do Flink para um cluster do Flink do Dataproc usando a API jobs.submit do Dataproc.

Antes de usar qualquer um dos dados do pedido, faça as seguintes substituições:

  • PROJECT_ID: Google Cloud ID do projeto
  • REGION: região do cluster
  • CLUSTER_NAME: especifique o nome do cluster do Dataproc Flink para o qual quer enviar a tarefa

Método HTTP e URL:

POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit

Corpo JSON do pedido:

{
  "job": {
    "placement": {
      "clusterName": "CLUSTER_NAME"
    },
    "flinkJob": {
      "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
      "jarFileUris": [
        "file:///usr/lib/flink/examples/batch/WordCount.jar"
      ]
    }
  }
}

Para enviar o seu pedido, expanda uma destas opções:

Deve receber uma resposta JSON semelhante à seguinte:

{
  "reference": {
    "projectId": "PROJECT_ID",
    "jobId": "JOB_ID"
  },
  "placement": {
    "clusterName": "CLUSTER_NAME",
    "clusterUuid": "CLUSTER_UUID"
  },
  "flinkJob": {
    "mainClass": "org.apache.flink.examples.java.wordcount.WordCount",
    "args": [
      "1000"
    ],
    "jarFileUris": [
      "file:///usr/lib/flink/examples/batch/WordCount.jar"
    ]
  },
  "status": {
    "state": "PENDING",
    "stateStartTime": "2020-10-07T20:16:21.759Z"
  },
  "jobUuid": "JOB_UUID"
}
  • As tarefas do Flink são apresentadas na página Tarefas do Dataproc na Google Cloud consola.
  • Pode clicar em Parar ou Eliminar na página Tarefas ou Detalhes da tarefa na Google Cloud consola para parar ou eliminar uma tarefa.

Em vez de executar tarefas do Flink com o recurso Jobs do Dataproc, pode executar tarefas do Flink no nó principal do cluster do Flink com a CLI flink.

As secções seguintes descrevem diferentes formas de executar uma tarefa da CLI no cluster do Dataproc Flink.flink

  1. SSH para o nó principal: use o utilitário SSH para abrir uma janela de terminal na VM principal do cluster.

  2. Defina o caminho de classe: inicialize o caminho de classe do Hadoop a partir da janela do terminal SSH na VM principal do cluster do Flink:

    export HADOOP_CLASSPATH=$(hadoop classpath)
    
  3. Executar tarefas do Flink: pode executar tarefas do Flink em diferentes modos de implementação no YARN: modo de aplicação, por tarefa e de sessão.

    1. Modo de aplicação: o modo de aplicação do Flink é suportado pela versão 2.0 e posteriores da imagem do Dataproc. Este modo executa o método main() do trabalho no gestor de trabalhos do YARN. O cluster é encerrado após a conclusão da tarefa.

      Exemplo de envio de emprego:

      flink run-application \
          -t yarn-application \
          -Djobmanager.memory.process.size=1024m \
          -Dtaskmanager.memory.process.size=2048m \
          -Djobmanager.heap.mb=820 \
          -Dtaskmanager.heap.mb=1640 \
          -Dtaskmanager.numberOfTaskSlots=2 \
          -Dparallelism.default=4 \
          /usr/lib/flink/examples/batch/WordCount.jar
      

      Apresentar tarefas em execução:

      ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
      

      Cancelar uma tarefa em execução:

      ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
      
    2. Modo por tarefa: este modo do Flink executa o método main() da tarefa no lado do cliente.

      Exemplo de envio de emprego:

      flink run \
          -m yarn-cluster \
          -p 4 \
          -ys 2 \
          -yjm 1024m \
          -ytm 2048m \
          /usr/lib/flink/examples/batch/WordCount.jar
      
    3. Modo de sessão: inicie uma sessão do Flink YARN de execução prolongada e, em seguida, envie um ou mais trabalhos para a sessão.

      1. Iniciar uma sessão: pode iniciar uma sessão do Flink de uma das seguintes formas:

        1. Crie um cluster do Flink, adicionando a flag --metadata flink-start-yarn-session=true ao comando gcloud dataproc clusters create (consulte o artigo Crie um cluster do Flink no Dataproc). Com esta flag ativada, após a criação do cluster, o Dataproc executa /usr/bin/flink-yarn-daemon para iniciar uma sessão do Flink no cluster.

          O ID da aplicação YARN da sessão é guardado em /tmp/.yarn-properties-${USER}. Pode listar o ID com o comando yarn application -list.

        2. Execute o script Flink yarn-session.sh pré-instalado na VM principal do cluster com definições personalizadas:

          Exemplo com definições personalizadas:

          /usr/lib/flink/bin/yarn-session.sh \
              -s 1 \
              -jm 1024m \
              -tm 2048m \
              -nm flink-dataproc \
              --detached
          
        3. Execute o script de wrapper do Flink com as definições predefinidas:/usr/bin/flink-yarn-daemon

          . /usr/bin/flink-yarn-daemon
          
      2. Envie uma tarefa para uma sessão: execute o seguinte comando para enviar uma tarefa do Flink para a sessão.

        flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
        
        • FLINK_MASTER_URL: o URL, incluindo o anfitrião e a porta, da VM principal do Flink onde os trabalhos são executados. Remova o http:// prefix do URL. Este URL é apresentado no resultado do comando quando inicia uma sessão do Flink. Pode executar o seguinte comando para listar este URL no campo Tracking-URL:
        yarn application -list -appId=<yarn-app-id> | sed 's#http://##'
           ```
        
      3. Liste trabalhos numa sessão: para listar trabalhos do Flink numa sessão, faça uma das seguintes ações:

        • Executar flink list sem argumentos. O comando procura o ID da aplicação YARN da sessão em /tmp/.yarn-properties-${USER}.

        • Obtenha o ID da aplicação YARN da sessão a partir de /tmp/.yarn-properties-${USER} ou do resultado de yarn application -list e, em seguida, execute <code>flink list -yid YARN_APPLICATION_ID.

        • Corrida flink list -m FLINK_MASTER_URL.

      4. Parar uma sessão: para parar a sessão, obtenha o ID da aplicação YARN da sessão a partir de /tmp/.yarn-properties-${USER} ou da saída de yarn application -list e, em seguida, execute um dos seguintes comandos:

        echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
        
        yarn application -kill YARN_APPLICATION_ID
        

Pode executar tarefas do Apache Beam no Dataproc através do FlinkRunner.

Pode executar tarefas do Beam no Flink das seguintes formas:

  1. Trabalhos Java Beam
  2. Empregos na Portable Beam

Trabalhos Java Beam

Empacote as suas tarefas do Beam num ficheiro JAR. Forneça o ficheiro JAR integrado com as dependências necessárias para executar o trabalho.

O exemplo seguinte executa uma tarefa do Java Beam a partir do nó principal do cluster do Dataproc.

  1. Crie um cluster do Dataproc com o componente Flink ativado.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: a versão da imagem do cluster, que determina a versão do Flink instalada no cluster (por exemplo, consulte as versões dos componentes do Apache Flink indicadas para as quatro versões de lançamento de imagens 2.0.x mais recentes e anteriores).
    • --region: uma região do Dataproc suportada.
    • --enable-component-gateway: ative o acesso à IU do Flink Job Manager.
    • --scopes: ative o acesso às Google Cloud APIs pelo seu cluster (consulte as práticas recomendadas de âmbitos). O âmbito cloud-platform está ativado por predefinição (não precisa de incluir esta definição de sinalizador) quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
  2. Use o utilitário SSH para abrir uma janela de terminal no nó principal do cluster Flink.

  3. Inicie uma sessão do Flink YARN no nó principal do cluster do Dataproc.

    . /usr/bin/flink-yarn-daemon
    

    Tome nota da versão do Flink no seu cluster do Dataproc.

    flink --version
    
  4. Na sua máquina local, gere o exemplo canónico de contagem de palavras do Beam em Java.

    Escolha uma versão do Beam compatível com a versão do Flink no seu cluster do Dataproc. Consulte a tabela Compatibilidade de versões do Flink que lista a compatibilidade de versões do Beam-Flink.

    Abra o ficheiro POM gerado. Verifique a versão do Beam Flink especificada pela etiqueta <flink.artifact.name>. Se a versão do Beam Flink runner no nome do artefacto do Flink não corresponder à versão do Flink no cluster, atualize o número da versão para corresponder.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. Exemplo de pacote de contagem de palavras.

    mvn package -Pflink-runner
    
  6. Carregue o ficheiro JAR uber compactado, word-count-beam-bundled-0.1.jar (~135 MB) para o nó principal do cluster do Dataproc. Pode usar gcloud storage cp para transferências de ficheiros mais rápidas para o cluster do Dataproc a partir do Cloud Storage.

    1. No terminal local, crie um contentor do Cloud Storage e carregue o JAR completo.

      gcloud storage buckets create BUCKET_NAME
      
      gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. No nó principal do Dataproc, transfira o JAR completo.

      gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Execute a tarefa Java Beam no nó principal do cluster do Dataproc.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. Verifique se os resultados foram escritos no seu contentor do Cloud Storage.

    gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Pare a sessão do Flink YARN.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

Empregos de operador de guindaste de viga portátil

Para executar tarefas do Beam escritas em Python, Go e outros idiomas suportados, pode usar o FlinkRunner e o PortableRunner, conforme descrito na página do Flink Runner do Beam (consulte também o Portability Framework Roadmap).

O exemplo seguinte executa uma tarefa portátil do Beam em Python a partir do nó principal do cluster do Dataproc.

  1. Crie um cluster do Dataproc com os componentes Flink e Docker ativados.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    Notas:

    • --optional-components: Flink e Docker.
    • --image-version: a versão da imagem do cluster, que determina a versão do Flink instalada no cluster (por exemplo, consulte as versões dos componentes do Apache Flink indicadas para as quatro versões de lançamento de imagens 2.0.x mais recentes e anteriores).
    • --region: uma região do Dataproc disponível.
    • --enable-component-gateway: ative o acesso à IU do Flink Job Manager.
    • --scopes: ative o acesso às Google Cloud APIs pelo seu cluster (consulte as práticas recomendadas de âmbitos). O âmbito cloud-platform está ativado por predefinição (não precisa de incluir esta definição de sinalizador) quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
  2. Use a CLI gcloud localmente ou no Cloud Shell para criar um contentor do Cloud Storage. Especifica o BUCKET_NAME quando executa um programa de contagem de palavras de exemplo.

    gcloud storage buckets create BUCKET_NAME
    
  3. Numa janela de terminal na VM do cluster, inicie uma sessão do Flink YARN. Tenha em atenção o URL principal do Flink, o endereço do Flink principal onde os trabalhos são executados. Especifica o FLINK_MASTER_URL quando executa um programa de contagem de palavras de exemplo.

    . /usr/bin/flink-yarn-daemon
    

    Apresente e tome nota da versão do Flink que está a executar o cluster do Dataproc. Especifica o FLINK_VERSION quando executa um programa de contagem de palavras de exemplo.

    flink --version
    
  4. Instale as bibliotecas Python necessárias para a tarefa no nó principal do cluster.

  5. Instale uma versão do Beam compatível com a versão do Flink no cluster.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Execute o exemplo de contagem de palavras no nó principal do cluster.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    Notas:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, conforme indicado anteriormente.
    • --flink_master: FLINK_MASTER_URL, conforme indicado anteriormente.
    • --flink_submit_uber_jar: use o JAR completo para executar a tarefa do Beam.
    • --output: BUCKET_NAME, criado anteriormente.
  7. Verifique se os resultados foram escritos no seu contentor.

    gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Pare a sessão do Flink YARN.

    1. Obtenha o ID da aplicação.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

O componente Flink do Dataproc suporta clusters Kerberized. É necessária uma permissão Kerberos válida para enviar e persistir um trabalho do Flink ou para iniciar um cluster do Flink. Por predefinição, um pedido do Kerberos permanece válido durante sete dias.

A interface Web do Flink Job Manager está disponível enquanto um trabalho do Flink ou um cluster de sessão do Flink está em execução. Para usar a interface Web:

  1. Crie um cluster do Dataproc Flink.
  2. Após a criação do cluster, clique no Component Gateway link do YARN ResourceManager no separador Interface Web na página Detalhes do cluster na Google Cloud consola.
  3. Na IU do YARN Resource Manager, identifique a entrada da aplicação do cluster Flink. Consoante o estado de conclusão de uma tarefa, é apresentado um link ApplicationMaster ou History.
  4. Para uma tarefa de streaming de execução prolongada, clique no link ApplicationManager para abrir o painel de controlo do Flink. Para uma tarefa concluída, clique no link Histórico para ver os detalhes da tarefa.