我有触发一pyspark模块从气流使用sparksubmit操作员。 但是,pyspark模块需要采取的火花届会议变为一个参数。 我已经使用application_args通过的参数,以pyspark模块。 但是,当我跑dag火花提交操作者越来越失败和参数我通过在认为没有类型的变量。 需要知道如何通过一个参数pyspark模块触发通过spark_submit_operator.
该 DAG 的代码如下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PRJT").enableHiveSupport().getOrCreate()
spark_config = {
'conn_id': 'spark_default',
'driver_memory': '1g',
'executor_cores': 1,
'num_executors': 1,
'executor_memory': '1g'
}
dag = DAG(
dag_id="spark_session_prgm",
default_args=default_args,
schedule_interval='@daily',
catchup=False)
spark_submit_task1 = SparkSubmitOperator(
task_id='spark_submit_task1',
application='/home/airflow_home/dags/tmp_spark_1.py',
application_args=['spark'],
**spark_config, dag=dag)
这样的代码中tmp_spark_1.py 程序: