如何读取动态参数气流运算符? [英] How to read dynamic argument airflow operator?

查看:127
本文介绍了如何读取动态参数气流运算符?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是python和airflow dag的新手。
我在下面的链接和代码中找到答案部分。

解决方案

根据存储在云存储中的数据 页的Cloud Composer文档:


为避免网络服务器错误,请确保该网络服务器需要解析DAG(未运行)的数据可用dags /文件夹。否则,Web服务器将无法访问数据或加载Airflow Web界面。


您的DAG试图在以下位置打开YAML文件 / home / airflow / gcs / data ,它不在网络服务器上。将文件放在GCS存储桶中的 dags / 文件夹下,调度程序,工作程序和Web服务器将可以访问该文件,而DAG将在Web UI中工作。


I am new in python and airflow dag. I am following below link and code which is mention in answer section.
How to pass dynamic arguments Airflow operator?

I am facing issue to reading yaml file, In yaml file I have some configuration related arguments.

configs:
    cluster_name: "test-cluster"
    project_id: "t***********"
    zone: "europe-west1-c"
    num_workers: 2
    worker_machine_type: "n1-standard-1"
    master_machine_type: "n1-standard-1"

In DAG script I have created one task which will be create cluster, before executing this task we need all the arguments which we need to pass on it default_args parameter like cluster-name, project_id etc.For reading those parameter I have created one readYML method.see below code

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.contrib.operators import dataproc_operator

from airflow.models import Variable
import yaml

def readYML():
     print("inside readYML")
     global cfg
     file_name = "/home/airflow/gcs/data/cluster_config.yml"
     with open(file_name, 'r') as ymlfile:
          cfg = yaml.load(ymlfile)
     print(cfg['configs']['cluster_name'])

 # Default Arguments
 readYML()

 dag_name = Variable.get("dag_name")

  default_args = {
     'owner': 'airflow',
     'depends_on_past': False,
     'start_date': datetime.now(),
     'email': ['airflow@example.com'],
     'email_on_failure': False,
     'email_on_retry': False,
     'retries': 1,
     'retry_delay': timedelta(minutes=5),
     #'cluster_name': cfg['configs']['cluster_name'],    
    }

    # Instantiate a DAG

    dag = DAG(dag_id='read_yml', default_args=default_args, 
    schedule_interval=timedelta(days=1))

    # Creating Tasks
    Task1 = DataprocClusterCreateOperator(
    task_id='create_cluster',
    dag=dag
    )

In this code there is no error, When I am uploading in GCP composer environment, No error notification is showing but this DAG is no runnable there is no Run button is coming.

See attached screen shot. I am using python 3 & airflow composer-1.7.2-airflow-1.10.2 version.

解决方案

According to the Data Stored in Cloud Storage page in the Cloud Composer docs:

To avoid a webserver error, make sure that data the webserver needs to parse a DAG (not run) is available in the dags/ folder. Otherwise, the webserver can't access the data or load the Airflow web interface.

Your DAG is attempting to open the YAML file under /home/airflow/gcs/data, which isn't present on the webserver. Put the file under the dags/ folder in your GCS bucket, and it will be accessible to the scheduler, workers, and webserver, and the DAG will work in the Web UI.

这篇关于如何读取动态参数气流运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆