Airflow使用 Operators(执行器)

刘超 15天前 ⋅ 5209 阅读   编辑

  operator(执行器)代表一个理想情况下是幂等的任务。operator(执行器)决定了 DAG 运行时实际执行的内容。有关更多信息,请参阅Operators Concepts文档Operators API Reference

目录

  1、BashOperator
    1)、模板
    2)、故障排除
        a、找不到 Jinja 模板
  2、PythonOperator
    1)、传递参数
    2)、模板
  3、ShortCircuitOperator:条件成立继续执行后续任务,条件不成立忽略后续所有任务
  4、SparkSubmitOperator
  5、SqoopOperator
  6、MySqlOperator
  7、SubDagOperator
  8、DummyOperator
  9、BranchPythonOperator

 
一、BashOperator

  使用BashOperator在Bash shell 中执行命令。

from airflow.operators.bash_operator import BashOperator

run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag)

  1、模板

  您可以使用Jinja 模板来参数化bash_command参数。

also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)

  2、故障排除

    1) 找不到 Jinja 模板

      在使用bash_command参数直接调用 Bash 脚本时,需要在脚本名称后添加空格。这是因为 Airflow 尝试将 Jinja 模板应用于一个失败的脚本

t2 = BashOperator(
task_id='bash_example',

# 这将会出现`Jinja template not found`的错误
# bash_command="/home/batcher/test.sh",

# 在加了空格之后,这会正常工作
bash_command="/home/batcher/test.sh ",
dag=dag)

二、PythonOperator

  使用PythonOperator执行 Python 回调。

from airflow.operators.python_operator import PythonOperator

def print_context(ds, **kwargs):
    print(kwargs)
    print (ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator (
    task_id = 'print_the_context' ,
    provide_context = True,
    python_callable = print_context ,
    dag = dag)

  1、传递参数

    使用op_args和op_kwargs参数将额外参数传递给 Python 的回调函数。

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag,
    )

    run_this >> task

  2、模板

  当您将provide_context参数设置为True,Airflow 会传入一组额外的关键字参数:一个用于每个Jinja 模板变量和一个templates_dict参数。

  templates_dict参数是模板化的,因此字典中的每个值都被评估为Jinja 模板

三、ShortCircuitOperator(短路调度)

from airflow.operators.python_operator import ShortCircuitOperator 
参数:

pool='adx',    线程池
priority_weight=8,  权重
task_id='month_trigger_condition',  任务id
provide_context=True,  
python_callable=date_tool.is_last_day_at_month ,  回调方法
dag=dag

  示例

四、SparkSubmitOperator

# airflow v1.10.1
# /usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

# 参数说明
    application=''
    conf=None
    conn_id='spark_default'
    files=None
    py_files=None
    driver_classpath=None
    jars=None
    java_class=None
    packages=None
    exclude_packages=None
    repositories=None
    total_executor_cores=None
    executor_cores=None
    executor_memory=None
    driver_memory=None
    keytab=None
    principal=None
    name='airflow-spark'
    num_executors=None
    application_args=None   :该参数只接收字符串类型的字典
    env_vars=None # Environment variables for spark-submit. It supports yarn and k8s mode too.
    verbose=False

  以yarn-cluster调度spark程序

  1、配置conn id,登陆airflow,选择Admin -> Connections,创建一个连接,如下

  

  说明:

  a、conn id必须为大写

  b、host是必填项,可以是local, yarn, spark://HOST:PORT, mesos://HOST:PORT and k8s://https://:

  c、在extra中,可以为queue、deploy-mode、spark-home、spark-binary、namespace属性配置值,但要注意deploy-mode、spark-home、spark-binary的分隔符是杠,不是下划线

  2、在dag中增加conn_id,其值是adx_request,必须和前面配置的一致,如下

week_stat_filed_oper = SparkSubmitOperator(pool='adx',priority_weight=11,task_id="week_stat_filed_%s"%project,conf={"spark.network.timeout":3600,"spark.sql.shuffle.partitions":1000,"spark.yarn.executor.memoryOverhead":8192,"spark.default.parallelism":2000,"spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.memory.useLegacyMode":True,"spark.storage.memoryFraction":0.6,"spark.shuffle.memoryFraction":0.2,"spark.storage.unrollFraction":0.2,"spark.shuffle.io.maxRetries":60,"spark.shuffle.io.retryWait":60,"spark.sql.windowExec.buffer.spill.threshold":1000000,"spark.shuffle.sort.bypassMergeThreshold":10000,"spark.memory.offHeap.enabled":True,"spark.memory.offHeap.size":"16G","spark.hive.exec.orc.default.buffer.size":16384},executor_cores=day_stat_resource["executor_cores"],num_executors=30,executor_memory='16G',driver_memory='4G',application="/home/sdev/liujichao/online/adx_stat_2.11-0.3-SNAPSHOT.jar",java_class=stat_class,name="day_stat_filed_%s_%s"%(project,week),application_args=[project,day,'week',run_mode,'--job_names','statRequestFiled'],conn_id='adx_request',jars=jars,dag=dag,verbose=True)

五、SqoopOperator

from airflow.contrib.operators.sqoop_operator import SqoopOperator

六、MySqlOperator

from airflow.operators.mysql_operator import MySqlOperator

七、SubDagOperator

from airflow.operators.subdag_operator import SubDagOperator

八、DummyOperator

from airflow.operators.dummy_operator import DummyOperator   

九、BranchPythonOperator

from airflow.operators.python_operator import BranchPythonOperator


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: