spark读取不存在的文件返回hive schema的emptydataframe

刘超 21天前 ⋅ 812 阅读   编辑

  基于spark2.0.2测试

  spark读取不存在文件会报错,当文件不存在时,能返回个空的hive schema的dataframe是不是更好,思路如下

获取hive表描述信息并将其转换成schema,重写baseRelationToDataFrame逻辑,向resolveRelation传入false表示,当文件不存在时返回空的dataframe

  代码大体如下,注意代码还没测通,现在还有需求要做,即使测通后,也不能马上上线,还需要按场景测试一下,这个以后再说吧,先加个开关控制一下,这里记一下想法

import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
import scala.util.parsing.json.JSONObject
import org.apache.spark.sql.types._

var columns = spark.catalog.listColumns("adx","log_adx_click").select("name","dataType").collect()

def typeMap = Map[String, DataType]("string" -> org.apache.spark.sql.types.StringType,"short" -> org.apache.spark.sql.types.ShortType,"integer" -> org.apache.spark.sql.types.IntegerType,"int" -> org.apache.spark.sql.types.IntegerType,"long" -> org.apache.spark.sql.types.LongType,"bigint" -> org.apache.spark.sql.types.LongType,"float" -> org.apache.spark.sql.types.FloatType,"double" -> org.apache.spark.sql.types.DoubleType,"boolean" -> org.apache.spark.sql.types.BooleanType,"byte" -> org.apache.spark.sql.types.ByteType,"binary" -> org.apache.spark.sql.types.BinaryType,"date" -> org.apache.spark.sql.types.DateType,"timestamp" -> org.apache.spark.sql.types.TimestampType,"calendarinterval" -> org.apache.spark.sql.types.CalendarIntervalType,"null" -> org.apache.spark.sql.types.NullType)

def array2struct(arr:Any):DataType = {
  arr match {
    case ja: Array[Any] =>
        val struct1 = new StructType()
        var sfs = ja.map {
          case (name: String, ctpye: String) =>
            struct1.add(name, typeMap(ctpye))
          case jo: org.apache.spark.sql.Row =>
            println("---------")
            println(jo)
            println(jo.getClass.getName)
            value
            jo.getAs(1) match {
                case rm1: Map[Any,Any] => 
                    struct1.add(jo.getAs(0),array2struct(rm1))
                case ro1: Any => 
                    struct1.add(jo.getAs(0),typeMap(ro1))
            }
        }
        struct1
    case jm: Map[Any,Any] => 
        val key = array2struct(jm(0))
        val value = array2struct(jm(1))
        MapType(key,value)
    case js: String => 
        typeMap(js)
    case other: Any =>
        throw new RuntimeException(s"Not Row/Array, type:${other.getClass}")
  }
}

var df = spark.baseRelationToDataFrame(DataSource.apply(spark,"orc",Array("hdfs://opera/apps/hive/warehouse/adx.db/log_adx_click/*/*/*/day=20200506").toSeq,userSpecifiedSchema = Option(array2struct(columns).asInstanceOf[StructType])).resolveRelation(false))
df.show(3)

  使用spark2.0.2中DataType.fromJson()将json转换成schema,对于基本类型是可以的,对于复杂类型(比如map类型)不行,报错

var fileds = spark.catalog.listColumns("adx","log_adx_click").select("name","dataType","nullable","isPartition").collect().map(t => {
  // not partition field
  if (!t.getBoolean(3)){
    var dataType = t.getString(1) match {
      case "bigint" => "long"
      case "int" => "integer"
      case "map<string,int>" => scala.collection.Map
      case _ => t.getString(1)
    }
    "{\"name\":\""+t.getString(0)+"\",\"type\":\""+dataType+"\",\"nullable\":"+t.getBoolean(2)+"}"
  }else{
    None
  }
}).filter(v  => v != None)
var jsonSchema = "{\"type\":\"struct\",\"fields\":["+fileds.mkString(",")+"]}"

import org.apache.spark.sql.types.{DataType, StructType}
val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]

scala> val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
java.util.NoSuchElementException: key not found: scala.collection.Map$@3fea0eb1
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
  at org.apache.spark.sql.types.DataType$.nameToType(DataType.scala:113)
  at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:127)
  at org.apache.spark.sql.types.DataType$.org$apache$spark$sql$types$DataType$$parseStructField(DataType.scala:176)
  at org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:145)
  at org.apache.spark.sql.types.DataType$$anonfun$parseDataType$1.apply(DataType.scala:145)
  at scala.collection.immutable.List.map(List.scala:277)
  at org.apache.spark.sql.types.DataType$.parseDataType(DataType.scala:145)
  at org.apache.spark.sql.types.DataType$.fromJson(DataType.scala:99)

注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: