Spark jdbc打印出insert语句

刘超 1月前 ⋅ 1717 阅读   编辑

  spark通过jdbc保存dataframe到mysql时打印不出执行的insert语句,即是日志级别改为debug也打印不出来。当有这类Incorrect string value: '\xC4\x81\xE1\xB8\x91' for column 'field_value' at row 14问题时,真不知道怎么定位异常数据。下面修改spark-sql源码,输出执行的insert语句。

  我们知道在spring项目中可以通过log4j打印出来执行的sql语句,那在spark-sql中怎么打印呢。网上没找到什么方法,改一下spark-sql源码吧

1、在spark-sql_2.11-2.0.2.2.5.6.0-40.jar(这里配置了jar关联到对应的xxx.source.jar,当访问某个类时,会从spark-sql_2.11-2.0.2.2.5.6.0-40.source.jar打开该类)中,找到JdbcUtils类,定位到savePartition方法,如下

 

  2、该类继承Logging,可以直接使用logError等方法,在229行添加如下catch代码

catch {
    case e: SQLException =>
        logError(stmt.toString)
        throw e
}

  添加后效果如下

 

  3、编译JdbcUtils。将spark-sql_2.11-2.0.2.2.5.6.0-40.jar解压,将编译后class复制到对应目录中,然后再压缩

  4、使用新压缩的spark-sql_2.11-2.0.2.2.5.6.0-40.jar替换spark2/jars中的老包,并spark2.0.2/jars目录的所有jar打成spark202-hdp-yarn-archive.tar.gz

  5、执行spark --master  yarn --deploy-mode cluster --conf spark.yarn.archive=hdfs://opera/user/sdev/software/spark2/spark202-hdp-yarn-archive.tar.gz,当有报错时,即可打印出insert语句,效果如下

06:35:05,453 ERROR org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils     - com.mysql.jdbc.JDBC4PreparedStatement@2e530e95: INSERT INTO r_day_stat_survey (`test`,`ad_id`,`mid`,`question_index`,`question_type`,`pv_info`,`uv_info`,`day`) VALUES (3,'a2662238578240','m2662238578304','0','LEADS','{"total":1,"PHONE":["5555"],"NAME":["?????"]}','{"total":1,"PHONE":["5555"],"NAME":["?????"]}','20200311')
06:35:05,455 ERROR org.apache.spark.executor.Executor                            - Exception in task 185.0 in stage 11.0 (TID 1096)
java.sql.BatchUpdateException: Incorrect string value: '\xE7\xA9\xBA\xE5\x86\x9B...' for column 'pv_info' at row 1
    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(PreparedStatement.java:1809)
    at com.mysql.jdbc.PreparedStatement.executeBatch(PreparedStatement.java:1441)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:227)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:305)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:304)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)

  注意:这里以yarn-cluster模式运行让spark.yarn.archive参数生效


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: