airflow以yarn-client调度spark程序,而不能以yarn-cluster调度

刘超 1月前 ⋅ 5489 阅读   编辑

一、描述

  还是为了解决“org.apache.hadoop.hive.ql.exec.mr.MapRedTask. GC overhead limit exceeded”问题,修改hive.exec.orc.split.strategy设为BI后,又报 No space left on device;airflow默认以yarn-client调度spark程序,driver启动在airflow所在节点,由于airflow所在节点内存不足,所以如果把driver调度到yarn,是不是就可以解决该问题。下面研究怎么让airflow以yarn-cluster调度spark程序

二、分析

  1、在dag adx_request.py文件,我们通过from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator引入spark-submit,所以查看airflow.contrib.operators.spark_submit_operator对应的文件/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py,在该文件有这样参数说明,env_vars=None 参数有 Environment variables for spark-submit. It supports yarn and k8s mode too 这样解释,所以在dag中添加env_vars={"--master":"yarn","--deploy-mode":"cluster"},完整配置如下

week_stat_filed_oper = SparkSubmitOperator(pool='adx',priority_weight=11,task_id="week_stat_filed_%s"%project,conf={"spark.network.timeout":3600,"spark.sql.shuffle.partitions":1000,"spark.yarn.executor.memoryOverhead":8192,"spark.default.parallelism":2000,"spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.memory.useLegacyMode":True,"spark.storage.memoryFraction":0.6,"spark.shuffle.memoryFraction":0.2,"spark.storage.unrollFraction":0.2,"spark.shuffle.io.maxRetries":60,"spark.shuffle.io.retryWait":60,"spark.sql.windowExec.buffer.spill.threshold":1000000,"spark.shuffle.sort.bypassMergeThreshold":10000,"spark.memory.offHeap.enabled":True,"spark.memory.offHeap.size":"16G","spark.hive.exec.orc.default.buffer.size":16384},executor_cores=day_stat_resource["executor_cores"],num_executors=30,executor_memory='16G',driver_memory='4G',application="/home/sdev/liujichao/online/adx_stat_2.11-0.3-SNAPSHOT.jar",java_class=stat_class,name="day_stat_filed_%s_%s"%(project,week),application_args=[project,day,'week',run_mode,'--job_names','statRequestFiled'],conn_id='adx_request',env_vars={"--master":"yarn","--deploy-mode":"cluster"},jars=jars,dag=dag,verbose=True)

  在日志中,找到如下打印信息

 '--conf', 'spark.yarn.appMasterEnv.--deploy-mode=cluster', '--conf', 'spark.yarn.appMasterEnv.--master=yarn'

  看来还是没调度到yarn上

  2、spark_submit_operator.py文件没做什么处理,它直接调用SparkSubmitHook,如下

sdev@n-adx-hadoop-client-3:$ cat /usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/spark_submit_operator.py
# -*- coding: utf-8 -*-
from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook
from airflow.models import BaseOperator
from airflow.settings import WEB_COLORS
from airflow.utils.decorators import apply_defaults


class SparkSubmitOperator(BaseOperator):
    template_fields = ('_name', '_application_args', '_packages')
    ui_color = WEB_COLORS['LIGHTORANGE']

    ...

    def execute(self, context):
        """
        Call the SparkSubmitHook to run the provided spark job
        """
        self._hook = SparkSubmitHook(
            conf=self._conf,
            conn_id=self._conn_id,
            files=self._files,
            py_files=self._py_files,
            driver_classpath=self._driver_classpath,
            jars=self._jars,
            java_class=self._java_class,
            packages=self._packages,
            exclude_packages=self._exclude_packages,
            repositories=self._repositories,
            total_executor_cores=self._total_executor_cores,
            executor_cores=self._executor_cores,
            executor_memory=self._executor_memory,
            driver_memory=self._driver_memory,
            keytab=self._keytab,
            principal=self._principal,
            name=self._name,
            num_executors=self._num_executors,
            application_args=self._application_args,
            env_vars=self._env_vars,
            verbose=self._verbose
        )
        self._hook.submit(self._application)

    def on_kill(self):
        self._hook.on_kill()

  3、SparkSubmitHook对应/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py文件,浏览一遍代码,配置项目信息都是在_resolve_connection方法中设置的,如下

sdev@n-adx-hadoop-client-3:~$ cat /usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/spark_submit_hook.py
# -*- coding: utf-8 -*-
import os
import subprocess
import re
import time

from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.contrib.kubernetes import kube_client


class SparkSubmitHook(BaseHook, LoggingMixin):
    
    ...

    def __init__(self,
                 conf=None,
                 conn_id='spark_default',
                 files=None,
                 py_files=None,
                 driver_classpath=None,
                 jars=None,
                 java_class=None,
                 packages=None,
                 exclude_packages=None,
                 repositories=None,
                 total_executor_cores=None,
                 executor_cores=None,
                 executor_memory=None,
                 driver_memory=None,
                 keytab=None,
                 principal=None,
                 name='default-name',
                 num_executors=None,
                 application_args=None,
                 env_vars=None,
                 verbose=False):
        self._conf = conf
        self._conn_id = conn_id
        self._files = files
        self._py_files = py_files
        
        ...

        self._connection = self._resolve_connection()
        self._is_yarn = 'yarn' in self._connection['master']
        self._is_kubernetes = 'k8s' in self._connection['master']
        
        ...

    def _resolve_should_track_driver_status(self):
        return ('spark://' in self._connection['master'] and
                self._connection['deploy_mode'] == 'cluster')

    def _resolve_connection(self):
        # Build from connection master or default to yarn if not available
        conn_data = {'master': 'yarn',
                     'queue': None,
                     'deploy_mode': None,
                     'spark_home': None,
                     'spark_binary': 'spark-submit',
                     'namespace': 'default'}

        try:
            # Master can be local, yarn, spark://HOST:PORT, mesos://HOST:PORT and
            # k8s://https://:
            conn = self.get_connection(self._conn_id)
            if conn.port:
                conn_data['master'] = "{}:{}".format(conn.host, conn.port)
            else:
                conn_data['master'] = conn.host

            # Determine optional yarn queue from the extra field
            extra = conn.extra_dejson
            conn_data['queue'] = extra.get('queue', None)
            conn_data['deploy_mode'] = extra.get('deploy-mode', None)
            conn_data['spark_home'] = extra.get('spark-home', None)
            conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
            conn_data['namespace'] = extra.get('namespace', 'default')
        except AirflowException:
            self.log.debug(
                "Could not load connection string %s, defaulting to %s",
                self._conn_id, conn_data['master']
            )

        return conn_data

  4、我们在继续看一下get_connection逻辑,如下(get_connection在/usr/local/lib/python2.7/dist-packages/airflow/hooks/base_hook.py中

sdev@n-adx-hadoop-client-3:~$ cat /usr/local/lib/python2.7/dist-packages/airflow/hooks/base_hook.py
# -*- coding: utf-8 -*-

...

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import random

from airflow.models import Connection
from airflow.exceptions import AirflowException
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin

CONN_ENV_PREFIX = 'AIRFLOW_CONN_'


class BaseHook(LoggingMixin):
    def __init__(self, source):
        pass

    @classmethod
    @provide_session
    def _get_connections_from_db(cls, conn_id, session=None):
        db = (
            session.query(Connection)
            .filter(Connection.conn_id == conn_id)
            .all()
        )
        session.expunge_all()
        if not db:
            raise AirflowException(
                "The conn_id `{0}` isn't defined".format(conn_id))
        return db

    @classmethod
    def _get_connection_from_env(cls, conn_id):
        environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
        conn = None
        if environment_uri:
            conn = Connection(conn_id=conn_id, uri=environment_uri)
        return conn

    @classmethod
    def get_connections(cls, conn_id):
        conn = cls._get_connection_from_env(conn_id)
        if conn:
            conns = [conn]
        else:
            conns = cls._get_connections_from_db(conn_id)
        return conns

    @classmethod
    def get_connection(cls, conn_id):
        conn = random.choice(cls.get_connections(conn_id))
        if conn.host:
            log = LoggingMixin().log
            log.info("Using connection to: %s", conn.host)
        return conn

  看如上代码,有个重要点需要提醒一下:conn id会被转换成大写

  5、我们再看看Connection,其对应/usr/local/lib/python2.7/dist-packages/airflow/models.py文件,如下(conn id可以在页面中配置,这里就是从数据库获取页面配置的逻辑)

class Connection(Base, LoggingMixin):
    
    __tablename__ = "connection"

    id = Column(Integer(), primary_key=True)
    conn_id = Column(String(ID_LEN))
    conn_type = Column(String(500))
    host = Column(String(500))
    schema = Column(String(500))
    login = Column(String(500))
    _password = Column('password', String(5000))
    port = Column(Integer())
    is_encrypted = Column(Boolean, unique=False, default=False)
    is_extra_encrypted = Column(Boolean, unique=False, default=False)
    _extra = Column('extra', String(5000))

    _types = [
        ('docker', 'Docker Registry',),
        ('fs', 'File (path)'),
        ('ftp', 'FTP',),
        ('google_cloud_platform', 'Google Cloud Platform'),
        ('hdfs', 'HDFS',),
        ('http', 'HTTP',),
        ('hive_cli', 'Hive Client Wrapper',),
        ('hive_metastore', 'Hive Metastore Thrift',),
        ('hiveserver2', 'Hive Server 2 Thrift',),
        ('jdbc', 'Jdbc Connection',),
        ('jenkins', 'Jenkins'),
        ('mysql', 'MySQL',),
        ('postgres', 'Postgres',),
        ('oracle', 'Oracle',),
        ('vertica', 'Vertica',),
        ('presto', 'Presto',),
        ('s3', 'S3',),
        ('samba', 'Samba',),
        ('sqlite', 'Sqlite',),
        ('ssh', 'SSH',),
        ('cloudant', 'IBM Cloudant',),
        ('mssql', 'Microsoft SQL Server'),
        ('mesos_framework-id', 'Mesos Framework ID'),
        ('jira', 'JIRA',),
        ('redis', 'Redis',),
        ('wasb', 'Azure Blob Storage'),
        ('databricks', 'Databricks',),
        ('aws', 'Amazon Web Services',),
        ('emr', 'Elastic MapReduce',),
        ('snowflake', 'Snowflake',),
        ('segment', 'Segment',),
        ('azure_data_lake', 'Azure Data Lake'),
        ('cassandra', 'Cassandra',),
    ]

    def __init__(
            self, conn_id=None, conn_type=None,
            host=None, login=None, password=None,
            schema=None, port=None, extra=None,
            uri=None):
        self.conn_id = conn_id
        if uri:
            self.parse_from_uri(uri)
        else:
            self.conn_type = conn_type
            self.host = host
            self.login = login
            self.password = password
            self.schema = schema
            self.port = port
            self.extra = extra

    def parse_from_uri(self, uri):
        temp_uri = urlparse(uri)
        hostname = temp_uri.hostname or ''
        if '%2f' in hostname:
            hostname = hostname.replace('%2f', '/').replace('%2F', '/')
        conn_type = temp_uri.scheme
        if conn_type == 'postgresql':
            conn_type = 'postgres'
        self.conn_type = conn_type
        self.host = hostname
        self.schema = temp_uri.path[1:]
        self.login = temp_uri.username
        self.password = temp_uri.password
        self.port = temp_uri.port
        if temp_uri.query:
            self.extra = json.dumps(dict(parse_qsl(temp_uri.query)))

    ...

    @property
    def extra_dejson(self):
        """Returns the extra property by deserializing json."""
        obj = {}
        if self.extra:
            try:
                obj = json.loads(self.extra)
            except Exception as e:
                self.log.exception(e)
                self.log.error("Failed parsing the json for conn_id %s", self.conn_id)

        return obj

  6、登陆airflow,选择Admin -> Connections,创建一个连接,如下

 

  说明:

  a、conn id必须为大写

  b、host是必填项,可以是local, yarn, spark://HOST:PORT, mesos://HOST:PORT and k8s://https://:

  c、在extra中,可以为queue、deploy-mode、spark-home、spark-binary、namespace属性配置值,但要注意deploy-mode、spark-home、spark-binary的分隔符是杠,不是下划线

  7、在dag中增加conn_id,其值是adx_request,如下

week_stat_filed_oper = SparkSubmitOperator(pool='adx',priority_weight=11,task_id="week_stat_filed_%s"%project,conf={"spark.network.timeout":3600,"spark.sql.shuffle.partitions":1000,"spark.yarn.executor.memoryOverhead":8192,"spark.default.parallelism":2000,"spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.memory.useLegacyMode":True,"spark.storage.memoryFraction":0.6,"spark.shuffle.memoryFraction":0.2,"spark.storage.unrollFraction":0.2,"spark.shuffle.io.maxRetries":60,"spark.shuffle.io.retryWait":60,"spark.sql.windowExec.buffer.spill.threshold":1000000,"spark.shuffle.sort.bypassMergeThreshold":10000,"spark.memory.offHeap.enabled":True,"spark.memory.offHeap.size":"16G","spark.hive.exec.orc.default.buffer.size":16384},executor_cores=day_stat_resource["executor_cores"],num_executors=30,executor_memory='16G',driver_memory='4G',application="/home/sdev/liujichao/online/adx_stat_2.11-0.3-SNAPSHOT.jar",java_class=stat_class,name="day_stat_filed_%s_%s"%(project,week),application_args=[project,day,'week',run_mode,'--job_names','statRequestFiled'],conn_id='adx_request',jars=jars,dag=dag,verbose=True)

  8、重跑任务,单击1,然后再单击2,查看日志,任务以spark-cluster模式跑了

 

三、解决方法

  需要做两步操作,如下

  1、配置conn id,登陆airflow,选择Admin -> Connections,创建一个连接,如下

 

  说明:

  a、conn id必须为大写

  b、host是必填项,可以是local, yarn, spark://HOST:PORT, mesos://HOST:PORT and k8s://https://:

  c、在extra中,可以为queue、deploy-mode、spark-home、spark-binary、namespace属性配置值,但要注意deploy-mode、spark-home、spark-binary的分隔符是杠,不是下划线

  2、在dag中增加conn_id,其值是adx_request,必须和前面配置的一致,如下

week_stat_filed_oper = SparkSubmitOperator(pool='adx',priority_weight=11,task_id="week_stat_filed_%s"%project,conf={"spark.network.timeout":3600,"spark.sql.shuffle.partitions":1000,"spark.yarn.executor.memoryOverhead":8192,"spark.default.parallelism":2000,"spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.memory.useLegacyMode":True,"spark.storage.memoryFraction":0.6,"spark.shuffle.memoryFraction":0.2,"spark.storage.unrollFraction":0.2,"spark.shuffle.io.maxRetries":60,"spark.shuffle.io.retryWait":60,"spark.sql.windowExec.buffer.spill.threshold":1000000,"spark.shuffle.sort.bypassMergeThreshold":10000,"spark.memory.offHeap.enabled":True,"spark.memory.offHeap.size":"16G","spark.hive.exec.orc.default.buffer.size":16384},executor_cores=day_stat_resource["executor_cores"],num_executors=30,executor_memory='16G',driver_memory='4G',application="/home/sdev/liujichao/online/adx_stat_2.11-0.3-SNAPSHOT.jar",java_class=stat_class,name="day_stat_filed_%s_%s"%(project,week),application_args=[project,day,'week',run_mode,'--job_names','statRequestFiled'],conn_id='adx_request',jars=jars,dag=dag,verbose=True)


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: