如何读取动态参数气流运算符? [英] How to read dynamic argument airflow operator?
问题描述
我是python和airflow dag的新手。
我在下面的链接和代码中找到答案部分。
根据存储在云存储中的数据 页的Cloud Composer文档:
为避免网络服务器错误,请确保该网络服务器需要解析DAG(未运行)的数据可用dags /文件夹。否则,Web服务器将无法访问数据或加载Airflow Web界面。
您的DAG试图在以下位置打开YAML文件 / home / airflow / gcs / data
,它不在网络服务器上。将文件放在GCS存储桶中的 dags /
文件夹下,调度程序,工作程序和Web服务器将可以访问该文件,而DAG将在Web UI中工作。
I am new in python and airflow dag.
I am following below link and code which is mention in answer section.
How to pass dynamic arguments Airflow operator?
I am facing issue to reading yaml file, In yaml file I have some configuration related arguments.
configs:
cluster_name: "test-cluster"
project_id: "t***********"
zone: "europe-west1-c"
num_workers: 2
worker_machine_type: "n1-standard-1"
master_machine_type: "n1-standard-1"
In DAG script I have created one task which will be create cluster, before executing this task we need all the arguments which we need to pass on it default_args parameter like cluster-name, project_id etc.For reading those parameter I have created one readYML method.see below code
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.contrib.operators import dataproc_operator
from airflow.models import Variable
import yaml
def readYML():
print("inside readYML")
global cfg
file_name = "/home/airflow/gcs/data/cluster_config.yml"
with open(file_name, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
print(cfg['configs']['cluster_name'])
# Default Arguments
readYML()
dag_name = Variable.get("dag_name")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
#'cluster_name': cfg['configs']['cluster_name'],
}
# Instantiate a DAG
dag = DAG(dag_id='read_yml', default_args=default_args,
schedule_interval=timedelta(days=1))
# Creating Tasks
Task1 = DataprocClusterCreateOperator(
task_id='create_cluster',
dag=dag
)
In this code there is no error, When I am uploading in GCP composer environment, No error notification is showing but this DAG is no runnable there is no Run button is coming.
See attached screen shot. I am using python 3 & airflow composer-1.7.2-airflow-1.10.2 version.
According to the Data Stored in Cloud Storage page in the Cloud Composer docs:
To avoid a webserver error, make sure that data the webserver needs to parse a DAG (not run) is available in the dags/ folder. Otherwise, the webserver can't access the data or load the Airflow web interface.
Your DAG is attempting to open the YAML file under /home/airflow/gcs/data
, which isn't present on the webserver. Put the file under the dags/
folder in your GCS bucket, and it will be accessible to the scheduler, workers, and webserver, and the DAG will work in the Web UI.
这篇关于如何读取动态参数气流运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!