Pode ativar componentes adicionais, como o Flink, quando cria um cluster do Dataproc através da funcionalidade Componentes opcionais. Esta página mostra como criar um cluster do Dataproc com o componente opcional Apache Flink ativado (um cluster do Flink) e, em seguida, executar tarefas do Flink no cluster.
Pode usar o cluster do Flink para:
Execute tarefas do Flink através do recurso
Jobs
do Dataproc a partir da Google Cloud consola, da Google Cloud CLI ou da API Dataproc.Execute tarefas do Flink com a CLI
flink
em execução no nó principal do cluster do Flink.Execute o Flink num cluster Kerberized
Crie um cluster do Dataproc Flink
Pode usar a Google Cloud consola, a Google Cloud CLI ou a API Dataproc para criar um cluster do Dataproc com o componente Flink ativado no cluster.
Recomendação: use um cluster de VMs padrão de 1 mestre com o componente Flink. Os clusters do modo de alta disponibilidade do Dataproc (com 3 VMs principais) não suportam o modo de alta disponibilidade do Flink.
Consola
Para criar um cluster do Dataproc Flink através da Google Cloud consola, siga estes passos:
Abra a página Dataproc Crie um cluster do Dataproc no Compute Engine.
- O painel Configurar cluster está selecionado.
- Na secção Controlo de versões, confirme ou altere o
Tipo e versão da imagem. A versão da imagem do cluster determina a versão do componente Flink instalado no cluster.
- A versão da imagem tem de ser 1.5 ou superior para ativar o componente Flink no cluster (consulte as versões do Dataproc suportadas para ver as fichas das versões dos componentes incluídas em cada lançamento de imagem do Dataproc).
- A versão da imagem tem de ser [TBD] ou superior para executar tarefas do Flink através da API Dataproc Jobs (consulte Executar tarefas do Flink do Dataproc).
- Na secção Componentes:
- Em Component Gateway, selecione Ativar Component Gateway. Tem de ativar o gateway de componentes para ativar o link do gateway de componentes para a IU do servidor de histórico do Flink. A ativação do gateway de componentes também permite o acesso à interface Web do gestor de tarefas do Flink em execução no cluster do Flink.
- Em Componentes opcionais, selecione Flink e outros componentes opcionais para ativar no cluster.
- Na secção Controlo de versões, confirme ou altere o
Tipo e versão da imagem. A versão da imagem do cluster determina a versão do componente Flink instalado no cluster.
Clique no painel Personalizar cluster (opcional).
Na secção Propriedades do cluster, clique em Adicionar propriedades para cada propriedade do cluster opcional que quer adicionar ao cluster. Pode adicionar propriedades com o prefixo
flink
para configurar propriedades do Flink em/etc/flink/conf/flink-conf.yaml
que vão funcionar como predefinições para as aplicações do Flink que executar no cluster.Exemplos:
- Defina
flink:historyserver.archive.fs.dir
para especificar a localização do Cloud Storage para escrever ficheiros do histórico de tarefas do Flink (esta localização vai ser usada pelo servidor do histórico do Flink em execução no cluster do Flink). - Defina slots de tarefas do Flink com
flink:taskmanager.numberOfTaskSlots=n
.
- Defina
Na secção Metadados de clusters personalizados, clique em Adicionar metadados para adicionar metadados opcionais. Por exemplo, adicione
flink-start-yarn-session
true
para executar o daemon do Flink YARN (/usr/bin/flink-yarn-daemon
) em segundo plano no nó principal do cluster para iniciar uma sessão do Flink YARN (consulte o modo de sessão do Flink).
Se estiver a usar a versão 2.0 ou anterior da imagem do Dataproc, clique no painel Gerir segurança (opcional) e, de seguida, em Acesso ao projeto, selecione
Enables the cloud-platform scope for this cluster
. O âmbitocloud-platform
está ativado por predefinição quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
- O painel Configurar cluster está selecionado.
Clique em Criar para criar o cluster.
gcloud
Para criar um cluster do Dataproc Flink através da CLI gcloud, execute o seguinte comando gcloud dataproc clusters create localmente numa janela de terminal ou no Cloud Shell:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
Notas:
- CLUSTER_NAME: especifique o nome do cluster.
- REGION: especifique uma região do Compute Engine onde o cluster vai estar localizado.
DATAPROC_IMAGE_VERSION: opcionalmente, especifique a versão da imagem a usar no cluster. A versão da imagem do cluster determina a versão do componente Flink instalado no cluster.
A versão da imagem tem de ser 1.5 ou superior para ativar o componente Flink no cluster (consulte as versões do Dataproc suportadas para ver as fichas das versões dos componentes incluídas em cada lançamento de imagem do Dataproc).
A versão da imagem tem de ser [TBD] ou superior para executar tarefas do Flink através da API Dataproc Jobs (consulte o artigo Execute tarefas do Flink do Dataproc).
--optional-components
: Tem de especificar o componenteFLINK
para executar tarefas do Flink e o serviço Web do Flink HistoryServer no cluster.--enable-component-gateway
: Tem de ativar o Component Gateway para ativar o link do Component Gateway para a IU do Flink History Server. A ativação do gateway de componentes também permite o acesso à interface Web do Flink Job Manager em execução no cluster do Flink.PROPERTIES. Opcionalmente, especifique uma ou mais propriedades do cluster.
Quando cria clusters do Dataproc com versões de imagens
2.0.67
+ e2.1.15
+, pode usar a flag--properties
para configurar propriedades do Flink em/etc/flink/conf/flink-conf.yaml
que vão atuar como predefinições para as aplicações Flink que executa no cluster.Pode definir
flink:historyserver.archive.fs.dir
para especificar a localização do Cloud Storage para escrever ficheiros do histórico de tarefas do Flink (esta localização vai ser usada pelo servidor do histórico do Flink em execução no cluster do Flink).Exemplo de várias propriedades:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
Outras sinalizações:
- Pode adicionar a flag opcional
--metadata flink-start-yarn-session=true
para executar o daemon do Flink YARN (/usr/bin/flink-yarn-daemon
) em segundo plano no nó principal do cluster para iniciar uma sessão do Flink YARN (consulte o modo de sessão do Flink).
- Pode adicionar a flag opcional
Quando usar as versões de imagem 2.0 ou anteriores, pode adicionar a flag
--scopes=https://www.googleapis.com/auth/cloud-platform
para ativar o acesso às APIs pelo seu cluster (consulte as Google Cloud práticas recomendadas de âmbitos). O âmbitocloud-platform
está ativado por predefinição quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
API
Para criar um cluster do Dataproc Flink através da API Dataproc, envie um pedido clusters.create, da seguinte forma:
Notas:
Defina o SoftwareConfig.Component como
FLINK
.Opcionalmente, pode definir
SoftwareConfig.imageVersion
para especificar a versão da imagem a usar no cluster. A versão da imagem do cluster determina a versão do componente Flink instalado no cluster.A versão da imagem tem de ser 1.5 ou superior para ativar o componente Flink no cluster (consulte as versões do Dataproc suportadas para ver as fichas das versões dos componentes incluídas em cada lançamento de imagem do Dataproc).
A versão da imagem tem de ser [TBD] ou superior para executar tarefas do Flink através da API Dataproc Jobs (consulte o artigo Execute tarefas do Flink do Dataproc).
Defina EndpointConfig.enableHttpPortAccess como
true
para ativar o gateway de componentes link para a IU do servidor de histórico do Flink. A ativação do gateway de componentes também permite o acesso à interface Web do Flink Job Manager em execução no cluster do Flink.Opcionalmente, pode definir
SoftwareConfig.properties
para especificar uma ou mais propriedades do cluster.- Pode especificar propriedades do Flink que atuam como predefinições para as aplicações do Flink que executa no cluster. Por exemplo, pode definir o
flink:historyserver.archive.fs.dir
para especificar a localização do Cloud Storage para escrever ficheiros do histórico de tarefas do Flink (esta localização vai ser usada pelo servidor de histórico do Flink em execução no cluster do Flink).
- Pode especificar propriedades do Flink que atuam como predefinições para as aplicações do Flink que executa no cluster. Por exemplo, pode definir o
Opcionalmente, pode definir:
GceClusterConfig.metadata
. Por exemplo, para especificarflink-start-yarn-session
true
para executar o daemon do Flink YARN (/usr/bin/flink-yarn-daemon
) em segundo plano no nó principal do cluster para iniciar uma sessão do Flink YARN (consulte Modo de sessão do Flink).- GceClusterConfig.serviceAccountScopes
para
https://www.googleapis.com/auth/cloud-platform
(âmbitocloud-platform
) quando usar versões de imagens 2.0 ou anteriores para ativar o acesso às Google Cloud APIs pelo seu cluster (consulte as práticas recomendadas de âmbitos). O âmbitocloud-platform
está ativado por predefinição quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
Depois de criar um cluster do Flink
- Use o link
Flink History Server
no Component Gateway para ver o Flink History Server em execução no cluster Flink. - Use o
YARN ResourceManager link
no gateway de componentes para ver a interface Web do Flink Job Manager em execução no cluster do Flink . - Crie um servidor de histórico persistente do Dataproc para ver os ficheiros do histórico de tarefas do Flink escritos por clusters do Flink existentes e eliminados.
Execute tarefas do Flink com o recurso Jobs
do Dataproc
Pode executar tarefas do Flink através do recurso Jobs
do Dataproc a partir da
Google Cloud consola, da CLI do Google Cloud ou da API Dataproc.
Consola
Para enviar uma tarefa de contagem de palavras do Flink de exemplo a partir da consola:
Abra a página Dataproc Enviar uma tarefa na Google Cloud consola no seu navegador.
Preencha os campos na página Enviar um trabalho:
- Selecione o nome do cluster na lista de clusters.
- Defina o Tipo de serviço como
Flink
. - Defina Main class or jar como
org.apache.flink.examples.java.wordcount.WordCount
. - Defina Ficheiros JAR como
file:///usr/lib/flink/examples/batch/WordCount.jar
.file:///
denota um ficheiro localizado no cluster. O Dataproc instalou oWordCount.jar
quando criou o cluster do Flink.- Este campo também aceita um caminho do Cloud Storage (
gs://BUCKET/JARFILE
) ou um caminho do Hadoop Distributed File System (HDFS) (hdfs://PATH_TO_JAR
).
Clique em Enviar.
- O resultado do controlador de tarefas é apresentado na página Detalhes da tarefa.
- As tarefas do Flink são apresentadas na página Tarefas do Dataproc na Google Cloud consola.
- Clique em Parar ou Eliminar na página Tarefas ou Detalhes da tarefa para parar ou eliminar uma tarefa.
gcloud
Para enviar uma tarefa Flink para um cluster Flink do Dataproc, execute o comando gcloud dataproc jobs submit localmente numa janela de terminal ou no Cloud Shell.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
Notas:
- CLUSTER_NAME: Especifique o nome do cluster do Dataproc Flink ao qual enviar a tarefa.
- REGION: especifique uma região do Compute Engine onde o cluster está localizado.
- MAIN_CLASS: especifique a classe
main
da sua aplicação Flink, como:org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: especifique o ficheiro JAR da aplicação Flink. Pode especificar:
- Um ficheiro JAR instalado no cluster, usando o prefixo
file:///` :
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Um ficheiro JAR no Cloud Storage:
gs://BUCKET/JARFILE
- Um ficheiro JAR no HDFS:
hdfs://PATH_TO_JAR
- Um ficheiro JAR instalado no cluster, usando o prefixo
JOB_ARGS: opcionalmente, adicione argumentos de tarefas após o duplo traço (
--
).Depois de enviar a tarefa, o resultado do controlador de tarefas é apresentado no terminal local ou do Cloud Shell.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
Esta secção mostra como enviar uma tarefa do Flink para um cluster do Flink do Dataproc usando a API jobs.submit do Dataproc.
Antes de usar qualquer um dos dados do pedido, faça as seguintes substituições:
- PROJECT_ID: Google Cloud ID do projeto
- REGION: região do cluster
- CLUSTER_NAME: especifique o nome do cluster do Dataproc Flink para o qual quer enviar a tarefa
Método HTTP e URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
Corpo JSON do pedido:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
Para enviar o seu pedido, expanda uma destas opções:
Deve receber uma resposta JSON semelhante à seguinte:
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- As tarefas do Flink são apresentadas na página Tarefas do Dataproc na Google Cloud consola.
- Pode clicar em Parar ou Eliminar na página Tarefas ou Detalhes da tarefa na Google Cloud consola para parar ou eliminar uma tarefa.
Execute tarefas do Flink através da CLI flink
Em vez de
executar tarefas do Flink com o recurso Jobs
do Dataproc,
pode executar tarefas do Flink no nó principal do cluster do Flink com a CLI flink
.
As secções seguintes descrevem diferentes formas de executar uma tarefa da CLI no cluster do Dataproc Flink.flink
SSH para o nó principal: use o utilitário SSH para abrir uma janela de terminal na VM principal do cluster.
Defina o caminho de classe: inicialize o caminho de classe do Hadoop a partir da janela do terminal SSH na VM principal do cluster do Flink:
export HADOOP_CLASSPATH=$(hadoop classpath)
Executar tarefas do Flink: pode executar tarefas do Flink em diferentes modos de implementação no YARN: modo de aplicação, por tarefa e de sessão.
Modo de aplicação: o modo de aplicação do Flink é suportado pela versão 2.0 e posteriores da imagem do Dataproc. Este modo executa o método
main()
do trabalho no gestor de trabalhos do YARN. O cluster é encerrado após a conclusão da tarefa.Exemplo de envio de emprego:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
Apresentar tarefas em execução:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
Cancelar uma tarefa em execução:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Modo por tarefa: este modo do Flink executa o método
main()
da tarefa no lado do cliente.Exemplo de envio de emprego:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
Modo de sessão: inicie uma sessão do Flink YARN de execução prolongada e, em seguida, envie um ou mais trabalhos para a sessão.
Iniciar uma sessão: pode iniciar uma sessão do Flink de uma das seguintes formas:
Crie um cluster do Flink, adicionando a flag
--metadata flink-start-yarn-session=true
ao comandogcloud dataproc clusters create
(consulte o artigo Crie um cluster do Flink no Dataproc). Com esta flag ativada, após a criação do cluster, o Dataproc executa/usr/bin/flink-yarn-daemon
para iniciar uma sessão do Flink no cluster.O ID da aplicação YARN da sessão é guardado em
/tmp/.yarn-properties-${USER}
. Pode listar o ID com o comandoyarn application -list
.Execute o script Flink
yarn-session.sh
pré-instalado na VM principal do cluster com definições personalizadas:Exemplo com definições personalizadas:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
Execute o script de wrapper do Flink com as definições predefinidas:
/usr/bin/flink-yarn-daemon
. /usr/bin/flink-yarn-daemon
Envie uma tarefa para uma sessão: execute o seguinte comando para enviar uma tarefa do Flink para a sessão.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: o URL, incluindo o anfitrião e a porta, da VM principal do Flink onde os trabalhos são executados.
Remova o
http:// prefix
do URL. Este URL é apresentado no resultado do comando quando inicia uma sessão do Flink. Pode executar o seguinte comando para listar este URL no campoTracking-URL
:
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: o URL, incluindo o anfitrião e a porta, da VM principal do Flink onde os trabalhos são executados.
Remova o
Liste trabalhos numa sessão: para listar trabalhos do Flink numa sessão, faça uma das seguintes ações:
Executar
flink list
sem argumentos. O comando procura o ID da aplicação YARN da sessão em/tmp/.yarn-properties-${USER}
.Obtenha o ID da aplicação YARN da sessão a partir de
/tmp/.yarn-properties-${USER}
ou do resultado deyarn application -list
e, em seguida, execute<code>
flink list -yid YARN_APPLICATION_ID.Corrida
flink list -m FLINK_MASTER_URL
.
Parar uma sessão: para parar a sessão, obtenha o ID da aplicação YARN da sessão a partir de
/tmp/.yarn-properties-${USER}
ou da saída deyarn application -list
e, em seguida, execute um dos seguintes comandos:echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Execute tarefas do Apache Beam no Flink
Pode executar tarefas do Apache Beam no Dataproc através do
FlinkRunner
.
Pode executar tarefas do Beam no Flink das seguintes formas:
- Trabalhos Java Beam
- Empregos na Portable Beam
Trabalhos Java Beam
Empacote as suas tarefas do Beam num ficheiro JAR. Forneça o ficheiro JAR integrado com as dependências necessárias para executar o trabalho.
O exemplo seguinte executa uma tarefa do Java Beam a partir do nó principal do cluster do Dataproc.
Crie um cluster do Dataproc com o componente Flink ativado.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: a versão da imagem do cluster, que determina a versão do Flink instalada no cluster (por exemplo, consulte as versões dos componentes do Apache Flink indicadas para as quatro versões de lançamento de imagens 2.0.x mais recentes e anteriores).--region
: uma região do Dataproc suportada.--enable-component-gateway
: ative o acesso à IU do Flink Job Manager.--scopes
: ative o acesso às Google Cloud APIs pelo seu cluster (consulte as práticas recomendadas de âmbitos). O âmbitocloud-platform
está ativado por predefinição (não precisa de incluir esta definição de sinalizador) quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
Use o utilitário SSH para abrir uma janela de terminal no nó principal do cluster Flink.
Inicie uma sessão do Flink YARN no nó principal do cluster do Dataproc.
. /usr/bin/flink-yarn-daemon
Tome nota da versão do Flink no seu cluster do Dataproc.
flink --version
Na sua máquina local, gere o exemplo canónico de contagem de palavras do Beam em Java.
Escolha uma versão do Beam compatível com a versão do Flink no seu cluster do Dataproc. Consulte a tabela Compatibilidade de versões do Flink que lista a compatibilidade de versões do Beam-Flink.
Abra o ficheiro POM gerado. Verifique a versão do Beam Flink especificada pela etiqueta
<flink.artifact.name>
. Se a versão do Beam Flink runner no nome do artefacto do Flink não corresponder à versão do Flink no cluster, atualize o número da versão para corresponder.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
Exemplo de pacote de contagem de palavras.
mvn package -Pflink-runner
Carregue o ficheiro JAR uber compactado,
word-count-beam-bundled-0.1.jar
(~135 MB) para o nó principal do cluster do Dataproc. Pode usargcloud storage cp
para transferências de ficheiros mais rápidas para o cluster do Dataproc a partir do Cloud Storage.No terminal local, crie um contentor do Cloud Storage e carregue o JAR completo.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
No nó principal do Dataproc, transfira o JAR completo.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Execute a tarefa Java Beam no nó principal do cluster do Dataproc.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
Verifique se os resultados foram escritos no seu contentor do Cloud Storage.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Pare a sessão do Flink YARN.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
Empregos de operador de guindaste de viga portátil
Para executar tarefas do Beam escritas em Python, Go e outros idiomas suportados, pode usar o FlinkRunner
e o PortableRunner
, conforme descrito na página do Flink Runner do Beam (consulte também o Portability Framework Roadmap).
O exemplo seguinte executa uma tarefa portátil do Beam em Python a partir do nó principal do cluster do Dataproc.
Crie um cluster do Dataproc com os componentes Flink e Docker ativados.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
Notas:
--optional-components
: Flink e Docker.--image-version
: a versão da imagem do cluster, que determina a versão do Flink instalada no cluster (por exemplo, consulte as versões dos componentes do Apache Flink indicadas para as quatro versões de lançamento de imagens 2.0.x mais recentes e anteriores).--region
: uma região do Dataproc disponível.--enable-component-gateway
: ative o acesso à IU do Flink Job Manager.--scopes
: ative o acesso às Google Cloud APIs pelo seu cluster (consulte as práticas recomendadas de âmbitos). O âmbitocloud-platform
está ativado por predefinição (não precisa de incluir esta definição de sinalizador) quando cria um cluster que usa a versão 2.1 ou posterior da imagem do Dataproc.
Use a CLI gcloud localmente ou no Cloud Shell para criar um contentor do Cloud Storage. Especifica o BUCKET_NAME quando executa um programa de contagem de palavras de exemplo.
gcloud storage buckets create BUCKET_NAME
Numa janela de terminal na VM do cluster, inicie uma sessão do Flink YARN. Tenha em atenção o URL principal do Flink, o endereço do Flink principal onde os trabalhos são executados. Especifica o FLINK_MASTER_URL quando executa um programa de contagem de palavras de exemplo.
. /usr/bin/flink-yarn-daemon
Apresente e tome nota da versão do Flink que está a executar o cluster do Dataproc. Especifica o FLINK_VERSION quando executa um programa de contagem de palavras de exemplo.
flink --version
Instale as bibliotecas Python necessárias para a tarefa no nó principal do cluster.
Instale uma versão do Beam compatível com a versão do Flink no cluster.
python -m pip install apache-beam[gcp]==BEAM_VERSION
Execute o exemplo de contagem de palavras no nó principal do cluster.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
Notas:
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, conforme indicado anteriormente.--flink_master
: FLINK_MASTER_URL, conforme indicado anteriormente.--flink_submit_uber_jar
: use o JAR completo para executar a tarefa do Beam.--output
: BUCKET_NAME, criado anteriormente.
Verifique se os resultados foram escritos no seu contentor.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Pare a sessão do Flink YARN.
- Obtenha o ID da aplicação.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Execute o Flink num cluster Kerberized
O componente Flink do Dataproc suporta clusters Kerberized. É necessária uma permissão Kerberos válida para enviar e persistir um trabalho do Flink ou para iniciar um cluster do Flink. Por predefinição, um pedido do Kerberos permanece válido durante sete dias.
Aceda à IU do Flink Job Manager
A interface Web do Flink Job Manager está disponível enquanto um trabalho do Flink ou um cluster de sessão do Flink está em execução. Para usar a interface Web:
- Crie um cluster do Dataproc Flink.
- Após a criação do cluster, clique no Component Gateway link do YARN ResourceManager no separador Interface Web na página Detalhes do cluster na Google Cloud consola.
- Na IU do YARN Resource Manager, identifique a entrada da aplicação do cluster Flink. Consoante o estado de conclusão de uma tarefa, é apresentado um link ApplicationMaster
ou History.
- Para uma tarefa de streaming de execução prolongada, clique no link ApplicationManager para abrir o painel de controlo do Flink. Para uma tarefa concluída, clique no link Histórico para ver os detalhes da tarefa.