Spark SQL 中org.apache.spark.sql.functions归纳
注意,这里使用的是scala 2.12.12,spark版本是最新的3.0.1版本
1. Sort functions
def asc
(columnName
: String): Column
= Column
(columnName
).asc
def asc_nulls_first
(columnName
: String): Column
= Column
(columnName
).asc_nulls_first
def asc_nulls_last
(columnName
: String): Column
= Column
(columnName
).asc_nulls_last
def desc
(columnName
: String): Column
= Column
(columnName
).desc
def desc_nulls_first
(columnName
: String): Column
= Column
(columnName
).desc_nulls_first
def desc_nulls_last
(columnName
: String): Column
= Column
(columnName
).desc_nulls_last
2. Aggregate functions
聚合函数,顾名思义,一般是对数据做聚合时使用在实际使用时,一般是在group by之后对数据做聚合,操作统计注意,和窗口函数对比
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct
(e
: Column
): Column
= approx_count_distinct
(e
)
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct
(columnName
: String): Column
= approx_count_distinct
(columnName
)
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct
(e
: Column
, rsd
: Double): Column
= approx_count_distinct
(e
, rsd
)
@deprecated("Use approx_count_distinct", "2.1.0")
def approxCountDistinct
(columnName
: String, rsd
: Double): Column
= {
approx_count_distinct
(Column
(columnName
), rsd
)
}
def approx_count_distinct
(e
: Column
): Column
= withAggregateFunction
{
HyperLogLogPlusPlus
(e
.expr
)
}
def approx_count_distinct
(columnName
: String): Column
= approx_count_distinct
(column
(columnName
))
def approx_count_distinct
(e
: Column
, rsd
: Double): Column
= withAggregateFunction
{
HyperLogLogPlusPlus
(e
.expr
, rsd
, 0, 0)
}
def approx_count_distinct
(columnName
: String, rsd
: Double): Column
= {
approx_count_distinct
(Column
(columnName
), rsd
)
}
def avg
(e
: Column
): Column
= withAggregateFunction
{ Average
(e
.expr
) }
def avg
(columnName
: String): Column
= avg
(Column
(columnName
))
def collect_list
(e
: Column
): Column
= withAggregateFunction
{ CollectList
(e
.expr
) }
def collect_list
(columnName
: String): Column
= collect_list
(Column
(columnName
))
def collect_set
(e
: Column
): Column
= withAggregateFunction
{ CollectSet
(e
.expr
) }
def collect_set
(columnName
: String): Column
= collect_set
(Column
(columnName
))
def corr
(column1
: Column
, column2
: Column
): Column
= withAggregateFunction
{
Corr
(column1
.expr
, column2
.expr
)
}
def corr
(columnName1
: String, columnName2
: String): Column
= {
corr
(Column
(columnName1
), Column
(columnName2
))
}
def count
(e
: Column
): Column
= withAggregateFunction
{
e
.expr
match {
case s
: Star
=> Count
(Literal
(1))
case _
=> Count
(e
.expr
)
}
}
def count
(columnName
: String): TypedColumn
[Any, Long] =
count
(Column
(columnName
)).as
(ExpressionEncoder
[Long]())
@scala.annotation
.varargs
def countDistinct
(expr
: Column
, exprs
: Column
*): Column
=
Column
(UnresolvedFunction
("count", (expr
+: exprs
).map
(_
.expr
), isDistinct
= true))
@scala.annotation
.varargs
def countDistinct
(columnName
: String, columnNames
: String*): Column
=
countDistinct
(Column
(columnName
), columnNames
.map
(Column
.apply
) : _
*)
def covar_pop
(column1
: Column
, column2
: Column
): Column
= withAggregateFunction
{
CovPopulation
(column1
.expr
, column2
.expr
)
}
def covar_pop
(columnName1
: String, columnName2
: String): Column
= {
covar_pop
(Column
(columnName1
), Column
(columnName2
))
}
def covar_samp
(column1
: Column
, column2
: Column
): Column
= withAggregateFunction
{
CovSample
(column1
.expr
, column2
.expr
)
}
def covar_samp
(columnName1
: String, columnName2
: String): Column
= {
covar_samp
(Column
(columnName1
), Column
(columnName2
))
}
def first
(e
: Column
, ignoreNulls
: Boolean): Column
= withAggregateFunction
{
First
(e
.expr
, ignoreNulls
)
}
def first
(columnName
: String, ignoreNulls
: Boolean): Column
= {
first
(Column
(columnName
), ignoreNulls
)
}
def first
(e
: Column
): Column
= first
(e
, ignoreNulls
= false)
def first
(columnName
: String): Column
= first
(Column
(columnName
))
def grouping
(e
: Column
): Column
= Column
(Grouping
(e
.expr
))
def grouping
(columnName
: String): Column
= grouping
(Column
(columnName
))
def grouping_id
(cols
: Column
*): Column
= Column
(GroupingID
(cols
.map
(_
.expr
)))
def grouping_id
(colName
: String, colNames
: String*): Column
= {
grouping_id
((Seq
(colName
) ++ colNames
).map
(n
=> Column
(n
)) : _
*)
}
def kurtosis
(e
: Column
): Column
= withAggregateFunction
{ Kurtosis
(e
.expr
) }
def kurtosis
(columnName
: String): Column
= kurtosis
(Column
(columnName
))
def last
(e
: Column
, ignoreNulls
: Boolean): Column
= withAggregateFunction
{
new Last
(e
.expr
, ignoreNulls
)
}
def last
(columnName
: String, ignoreNulls
: Boolean): Column
= {
last
(Column
(columnName
), ignoreNulls
)
}
def last
(e
: Column
): Column
= last
(e
, ignoreNulls
= false)
def last
(columnName
: String): Column
= last
(Column
(columnName
), ignoreNulls
= false)
def max
(e
: Column
): Column
= withAggregateFunction
{ Max
(e
.expr
) }
def max
(columnName
: String): Column
= max
(Column
(columnName
))
def mean
(e
: Column
): Column
= avg
(e
)
def mean
(columnName
: String): Column
= avg
(columnName
)
def min
(e
: Column
): Column
= withAggregateFunction
{ Min
(e
.expr
) }
def min
(columnName
: String): Column
= min
(Column
(columnName
))
def skewness
(e
: Column
): Column
= withAggregateFunction
{ Skewness
(e
.expr
) }
def skewness
(columnName
: String): Column
= skewness
(Column
(columnName
))
def stddev
(e
: Column
): Column
= withAggregateFunction
{ StddevSamp
(e
.expr
) }
def stddev
(columnName
: String): Column
= stddev
(Column
(columnName
))
def stddev_samp
(e
: Column
): Column
= withAggregateFunction
{ StddevSamp
(e
.expr
) }
def stddev_samp
(columnName
: String): Column
= stddev_samp
(Column
(columnName
))
def stddev_pop
(e
: Column
): Column
= withAggregateFunction
{ StddevPop
(e
.expr
) }
def stddev_pop
(columnName
: String): Column
= stddev_pop
(Column
(columnName
))
def sum
(e
: Column
): Column
= withAggregateFunction
{ Sum
(e
.expr
) }
def sum
(columnName
: String): Column
= sum
(Column
(columnName
))
def sumDistinct
(e
: Column
): Column
= withAggregateFunction
(Sum
(e
.expr
), isDistinct
= true)
def sumDistinct
(columnName
: String): Column
= sumDistinct
(Column
(columnName
))
def variance
(e
: Column
): Column
= withAggregateFunction
{ VarianceSamp
(e
.expr
) }
def variance
(columnName
: String): Column
= variance
(Column
(columnName
))
def var_samp
(e
: Column
): Column
= withAggregateFunction
{ VarianceSamp
(e
.expr
) }
def var_samp
(columnName
: String): Column
= var_samp
(Column
(columnName
))
def var_pop
(e
: Column
): Column
= withAggregateFunction
{ VariancePop
(e
.expr
) }
def var_pop
(columnName
: String): Column
= var_pop
(Column
(columnName
))
3. Window functions
def cume_dist
(): Column
= withExpr
{ new CumeDist
}
def dense_rank
(): Column
= withExpr
{ new DenseRank
}
def lag
(e
: Column
, offset
: Int): Column
= lag
(e
, offset
, null)
def lag
(columnName
: String, offset
: Int): Column
= lag
(columnName
, offset
, null)
def lag
(columnName
: String, offset
: Int, defaultValue
: Any): Column
= {
lag
(Column
(columnName
), offset
, defaultValue
)
}
def lag
(e
: Column
, offset
: Int, defaultValue
: Any): Column
= withExpr
{
Lag
(e
.expr
, Literal
(offset
), Literal
(defaultValue
))
}
def lead
(columnName
: String, offset
: Int): Column
= { lead
(columnName
, offset
, null) }
def lead
(e
: Column
, offset
: Int): Column
= { lead
(e
, offset
, null) }
def lead
(columnName
: String, offset
: Int, defaultValue
: Any): Column
= {
lead
(Column
(columnName
), offset
, defaultValue
)
}
def lead
(e
: Column
, offset
: Int, defaultValue
: Any): Column
= withExpr
{
Lead
(e
.expr
, Literal
(offset
), Literal
(defaultValue
))
}
def ntile
(n
: Int): Column
= withExpr
{ new NTile
(Literal
(n
)) }
def percent_rank
(): Column
= withExpr
{ new PercentRank
}
def rank
(): Column
= withExpr
{ new Rank
}
def row_number
(): Column
= withExpr
{ RowNumber
() }
@scala.annotation
.varargs
def array
(cols
: Column
*): Column
= withExpr
{ CreateArray
(cols
.map
(_
.expr
)) }
@scala.annotation
.varargs
def array
(colName
: String, colNames
: String*): Column
= {
array
((colName
+: colNames
).map
(col
) : _
*)
}
@scala.annotation
.varargs
def map
(cols
: Column
*): Column
= withExpr
{ CreateMap
(cols
.map
(_
.expr
)) }
def map_from_arrays
(keys
: Column
, values
: Column
): Column
= withExpr
{
MapFromArrays
(keys
.expr
, values
.expr
)
}
def broadcast
[T
](df
: Dataset
[T
]): Dataset
[T
] = {
Dataset
[T
](df
.sparkSession
,
ResolvedHint
(df
.logicalPlan
, HintInfo
(strategy
= Some
(BROADCAST
))))(df
.exprEnc
)
}
@scala.annotation
.varargs
def coalesce
(e
: Column
*): Column
= withExpr
{ Coalesce
(e
.map
(_
.expr
)) }
def input_file_name
(): Column
= withExpr
{ InputFileName
() }
def isnan
(e
: Column
): Column
= withExpr
{ IsNaN
(e
.expr
) }
def isnull
(e
: Column
): Column
= withExpr
{ IsNull
(e
.expr
) }
@deprecated("Use monotonically_increasing_id()", "2.0.0")
def monotonicallyIncreasingId
(): Column
= monotonically_increasing_id
()
def monotonically_increasing_id
(): Column
= withExpr
{ MonotonicallyIncreasingID
() }
def nanvl
(col1
: Column
, col2
: Column
): Column
= withExpr
{ NaNvl
(col1
.expr
, col2
.expr
) }
def negate
(e
: Column
): Column
= -e
def not
(e
: Column
): Column
= !e
def rand
(seed
: Long): Column
= withExpr
{ Rand
(seed
) }
def rand
(): Column
= rand
(Utils
.random
.nextLong
)
def randn
(seed
: Long): Column
= withExpr
{ Randn
(seed
) }
def randn
(): Column
= randn
(Utils
.random
.nextLong
)
def spark_partition_id
(): Column
= withExpr
{ SparkPartitionID
() }
def sqrt
(e
: Column
): Column
= withExpr
{ Sqrt
(e
.expr
) }
def sqrt
(colName
: String): Column
= sqrt
(Column
(colName
))
@scala.annotation
.varargs
def struct
(cols
: Column
*): Column
= withExpr
{ CreateStruct
.create
(cols
.map
(_
.expr
)) }
@scala.annotation
.varargs
def struct
(colName
: String, colNames
: String*): Column
= {
struct
((colName
+: colNames
).map
(col
) : _
*)
}
def when
(condition
: Column
, value
: Any): Column
= withExpr
{
CaseWhen
(Seq
((condition
.expr
, lit
(value
).expr
)))
}
def bitwiseNOT
(e
: Column
): Column
= withExpr
{ BitwiseNot
(e
.expr
) }
def expr
(expr
: String): Column
= {
val parser
= SparkSession
.getActiveSession
.map
(_
.sessionState
.sqlParser
).getOrElse
{
new SparkSqlParser
(new SQLConf
)
}
Column
(parser
.parseExpression
(expr
))
}
4. Math Functions
def abs
(e
: Column
): Column
= withExpr
{ Abs
(e
.expr
) }
def acos
(e
: Column
): Column
= withExpr
{ Acos
(e
.expr
) }
def acos
(columnName
: String): Column
= acos
(Column
(columnName
))
def asin
(e
: Column
): Column
= withExpr
{ Asin
(e
.expr
) }
def asin
(columnName
: String): Column
= asin
(Column
(columnName
))
def atan
(e
: Column
): Column
= withExpr
{ Atan
(e
.expr
) }
def atan
(columnName
: String): Column
= atan
(Column
(columnName
))
def atan2
(y
: Column
, x
: Column
): Column
= withExpr
{ Atan2
(y
.expr
, x
.expr
) }
def atan2
(y
: Column
, xName
: String): Column
= atan2
(y
, Column
(xName
))
def atan2
(yName
: String, x
: Column
): Column
= atan2
(Column
(yName
), x
)
def atan2
(yName
: String, xName
: String): Column
=
atan2
(Column
(yName
), Column
(xName
))
def atan2
(y
: Column
, xValue
: Double): Column
= atan2
(y
, lit
(xValue
))
def atan2
(yName
: String, xValue
: Double): Column
= atan2
(Column
(yName
), xValue
)
def atan2
(yValue
: Double, x
: Column
): Column
= atan2
(lit
(yValue
), x
)
def atan2
(yValue
: Double, xName
: String): Column
= atan2
(yValue
, Column
(xName
))
def bin
(e
: Column
): Column
= withExpr
{ Bin
(e
.expr
) }
def bin
(columnName
: String): Column
= bin
(Column
(columnName
))
def cbrt
(e
: Column
): Column
= withExpr
{ Cbrt
(e
.expr
) }
def cbrt
(columnName
: String): Column
= cbrt
(Column
(columnName
))
def ceil
(e
: Column
): Column
= withExpr
{ Ceil
(e
.expr
) }
def ceil
(columnName
: String): Column
= ceil
(Column
(columnName
))
def conv
(num
: Column
, fromBase
: Int, toBase
: Int): Column
= withExpr
{
Conv
(num
.expr
, lit
(fromBase
).expr
, lit
(toBase
).expr
)
}
def cos
(e
: Column
): Column
= withExpr
{ Cos
(e
.expr
) }
def cos
(columnName
: String): Column
= cos
(Column
(columnName
))
def cosh
(e
: Column
): Column
= withExpr
{ Cosh
(e
.expr
) }
def cosh
(columnName
: String): Column
= cosh
(Column
(columnName
))
def exp
(e
: Column
): Column
= withExpr
{ Exp
(e
.expr
) }
def exp
(columnName
: String): Column
= exp
(Column
(columnName
))
def expm1
(e
: Column
): Column
= withExpr
{ Expm1
(e
.expr
) }
def expm1
(columnName
: String): Column
= expm1
(Column
(columnName
))
def factorial
(e
: Column
): Column
= withExpr
{ Factorial
(e
.expr
) }
def floor
(e
: Column
): Column
= withExpr
{ Floor
(e
.expr
) }
def floor
(columnName
: String): Column
= floor
(Column
(columnName
))
@scala.annotation
.varargs
def greatest
(exprs
: Column
*): Column
= withExpr
{ Greatest
(exprs
.map
(_
.expr
)) }
@scala.annotation
.varargs
def greatest
(columnName
: String, columnNames
: String*): Column
= {
greatest
((columnName
+: columnNames
).map
(Column
.apply
): _
*)
}
def hex
(column
: Column
): Column
= withExpr
{ Hex
(column
.expr
) }
def unhex
(column
: Column
): Column
= withExpr
{ Unhex
(column
.expr
) }
def hypot
(l
: Column
, r
: Column
): Column
= withExpr
{ Hypot
(l
.expr
, r
.expr
) }
def hypot
(l
: Column
, rightName
: String): Column
= hypot
(l
, Column
(rightName
))
def hypot
(leftName
: String, r
: Column
): Column
= hypot
(Column
(leftName
), r
)
def hypot
(leftName
: String, rightName
: String): Column
=
hypot
(Column
(leftName
), Column
(rightName
))
def hypot
(l
: Column
, r
: Double): Column
= hypot
(l
, lit
(r
))
def hypot
(leftName
: String, r
: Double): Column
= hypot
(Column
(leftName
), r
)
def hypot
(l
: Double, r
: Column
): Column
= hypot
(lit
(l
), r
)
def hypot
(l
: Double, rightName
: String): Column
= hypot
(l
, Column
(rightName
))
@scala.annotation
.varargs
def least
(exprs
: Column
*): Column
= withExpr
{ Least
(exprs
.map
(_
.expr
)) }
@scala.annotation
.varargs
def least
(columnName
: String, columnNames
: String*): Column
= {
least
((columnName
+: columnNames
).map
(Column
.apply
): _
*)
}
def log
(e
: Column
): Column
= withExpr
{ Log
(e
.expr
) }
def log
(columnName
: String): Column
= log
(Column
(columnName
))
def log
(base
: Double, a
: Column
): Column
= withExpr
{ Logarithm
(lit
(base
).expr
, a
.expr
) }
def log
(base
: Double, columnName
: String): Column
= log
(base
, Column
(columnName
))
def log10
(e
: Column
): Column
= withExpr
{ Log10
(e
.expr
) }
def log10
(columnName
: String): Column
= log10
(Column
(columnName
))
def log1p
(e
: Column
): Column
= withExpr
{ Log1p
(e
.expr
) }
def log1p
(columnName
: String): Column
= log1p
(Column
(columnName
))
def log2
(expr
: Column
): Column
= withExpr
{ Log2
(expr
.expr
) }
def log2
(columnName
: String): Column
= log2
(Column
(columnName
))
def pow
(l
: Column
, r
: Column
): Column
= withExpr
{ Pow
(l
.expr
, r
.expr
) }
def pow
(l
: Column
, rightName
: String): Column
= pow
(l
, Column
(rightName
))
def pow
(leftName
: String, r
: Column
): Column
= pow
(Column
(leftName
), r
)
def pow
(leftName
: String, rightName
: String): Column
= pow
(Column
(leftName
), Column
(rightName
))
def pow
(l
: Column
, r
: Double): Column
= pow
(l
, lit
(r
))
def pow
(leftName
: String, r
: Double): Column
= pow
(Column
(leftName
), r
)
def pow
(l
: Double, r
: Column
): Column
= pow
(lit
(l
), r
)
def pow
(l
: Double, rightName
: String): Column
= pow
(l
, Column
(rightName
))
def pmod
(dividend
: Column
, divisor
: Column
): Column
= withExpr
{
Pmod
(dividend
.expr
, divisor
.expr
)
}
def rint
(e
: Column
): Column
= withExpr
{ Rint
(e
.expr
) }
def rint
(columnName
: String): Column
= rint
(Column
(columnName
))
def round
(e
: Column
): Column
= round
(e
, 0)
def round
(e
: Column
, scale
: Int): Column
= withExpr
{ Round
(e
.expr
, Literal
(scale
)) }
def bround
(e
: Column
): Column
= bround
(e
, 0)
def bround
(e
: Column
, scale
: Int): Column
= withExpr
{ BRound
(e
.expr
, Literal
(scale
)) }
def shiftLeft
(e
: Column
, numBits
: Int): Column
= withExpr
{ ShiftLeft
(e
.expr
, lit
(numBits
).expr
) }
def shiftRight
(e
: Column
, numBits
: Int): Column
= withExpr
{
ShiftRight
(e
.expr
, lit
(numBits
).expr
)
}
def shiftRightUnsigned
(e
: Column
, numBits
: Int): Column
= withExpr
{
ShiftRightUnsigned
(e
.expr
, lit
(numBits
).expr
)
}
def signum
(e
: Column
): Column
= withExpr
{ Signum
(e
.expr
) }
def signum
(columnName
: String): Column
= signum
(Column
(columnName
))
def sin
(e
: Column
): Column
= withExpr
{ Sin
(e
.expr
) }
def sin
(columnName
: String): Column
= sin
(Column
(columnName
))
def sinh
(e
: Column
): Column
= withExpr
{ Sinh
(e
.expr
) }
def sinh
(columnName
: String): Column
= sinh
(Column
(columnName
))
def tan
(e
: Column
): Column
= withExpr
{ Tan
(e
.expr
) }
def tan
(columnName
: String): Column
= tan
(Column
(columnName
))
def tanh
(e
: Column
): Column
= withExpr
{ Tanh
(e
.expr
) }
def tanh
(columnName
: String): Column
= tanh
(Column
(columnName
))
@deprecated("Use degrees", "2.1.0")
def toDegrees
(e
: Column
): Column
= degrees
(e
)
@deprecated("Use degrees", "2.1.0")
def toDegrees
(columnName
: String): Column
= degrees
(Column
(columnName
))
def degrees
(e
: Column
): Column
= withExpr
{ ToDegrees
(e
.expr
) }
def degrees
(columnName
: String): Column
= degrees
(Column
(columnName
))
@deprecated("Use radians", "2.1.0")
def toRadians
(e
: Column
): Column
= radians
(e
)
@deprecated("Use radians", "2.1.0")
def toRadians
(columnName
: String): Column
= radians
(Column
(columnName
))
def radians
(e
: Column
): Column
= withExpr
{ ToRadians
(e
.expr
) }
def radians
(columnName
: String): Column
= radians
(Column
(columnName
))
5. Misc functions
def md5
(e
: Column
): Column
= withExpr
{ Md5
(e
.expr
) }
def sha1
(e
: Column
): Column
= withExpr
{ Sha1
(e
.expr
) }
def sha2
(e
: Column
, numBits
: Int): Column
= {
require
(Seq
(0, 224, 256, 384, 512).contains
(numBits
),
s
"numBits $numBits is not in the permitted values (0, 224, 256, 384, 512)")
withExpr
{ Sha2
(e
.expr
, lit
(numBits
).expr
) }
}
def crc32
(e
: Column
): Column
= withExpr
{ Crc32
(e
.expr
) }
@scala.annotation
.varargs
def hash
(cols
: Column
*): Column
= withExpr
{
new Murmur3Hash
(cols
.map
(_
.expr
))
}
@scala.annotation
.varargs
def xxhash64
(cols
: Column
*): Column
= withExpr
{
new XxHash64
(cols
.map
(_
.expr
))
}
6. String functions
def ascii
(e
: Column
): Column
= withExpr
{ Ascii
(e
.expr
) }
def base64
(e
: Column
): Column
= withExpr
{ Base64
(e
.expr
) }
@scala.annotation
.varargs
def concat_ws
(sep
: String, exprs
: Column
*): Column
= withExpr
{
ConcatWs
(Literal
.create
(sep
, StringType
) +: exprs
.map
(_
.expr
))
}
def decode
(value
: Column
, charset
: String): Column
= withExpr
{
Decode
(value
.expr
, lit
(charset
).expr
)
}
def encode
(value
: Column
, charset
: String): Column
= withExpr
{
Encode
(value
.expr
, lit
(charset
).expr
)
}
def format_number
(x
: Column
, d
: Int): Column
= withExpr
{
FormatNumber
(x
.expr
, lit
(d
).expr
)
}
@scala.annotation
.varargs
def format_string
(format
: String, arguments
: Column
*): Column
= withExpr
{
FormatString
((lit
(format
) +: arguments
).map
(_
.expr
): _
*)
}
def initcap
(e
: Column
): Column
= withExpr
{ InitCap
(e
.expr
) }
def instr
(str
: Column
, substring
: String): Column
= withExpr
{
StringInstr
(str
.expr
, lit
(substring
).expr
)
}
def length
(e
: Column
): Column
= withExpr
{ Length
(e
.expr
) }
def lower
(e
: Column
): Column
= withExpr
{ Lower
(e
.expr
) }
def levenshtein
(l
: Column
, r
: Column
): Column
= withExpr
{ Levenshtein
(l
.expr
, r
.expr
) }
def locate
(substr
: String, str
: Column
): Column
= withExpr
{
new StringLocate
(lit
(substr
).expr
, str
.expr
)
}
def locate
(substr
: String, str
: Column
, pos
: Int): Column
= withExpr
{
StringLocate
(lit
(substr
).expr
, str
.expr
, lit
(pos
).expr
)
}
def lpad
(str
: Column
, len
: Int, pad
: String): Column
= withExpr
{
StringLPad
(str
.expr
, lit
(len
).expr
, lit
(pad
).expr
)
}
def ltrim
(e
: Column
): Column
= withExpr
{StringTrimLeft
(e
.expr
) }
def ltrim
(e
: Column
, trimString
: String): Column
= withExpr
{
StringTrimLeft
(e
.expr
, Literal
(trimString
))
}
def regexp_extract
(e
: Column
, exp
: String, groupIdx
: Int): Column
= withExpr
{
RegExpExtract
(e
.expr
, lit
(exp
).expr
, lit
(groupIdx
).expr
)
}
def regexp_replace
(e
: Column
, pattern
: String, replacement
: String): Column
= withExpr
{
RegExpReplace
(e
.expr
, lit
(pattern
).expr
, lit
(replacement
).expr
)
}
def regexp_replace
(e
: Column
, pattern
: Column
, replacement
: Column
): Column
= withExpr
{
RegExpReplace
(e
.expr
, pattern
.expr
, replacement
.expr
)
}
def unbase64
(e
: Column
): Column
= withExpr
{ UnBase64
(e
.expr
) }
def rpad
(str
: Column
, len
: Int, pad
: String): Column
= withExpr
{
StringRPad
(str
.expr
, lit
(len
).expr
, lit
(pad
).expr
)
}
def repeat
(str
: Column
, n
: Int): Column
= withExpr
{
StringRepeat
(str
.expr
, lit
(n
).expr
)
}
def rtrim
(e
: Column
): Column
= withExpr
{ StringTrimRight
(e
.expr
) }
def rtrim
(e
: Column
, trimString
: String): Column
= withExpr
{
StringTrimRight
(e
.expr
, Literal
(trimString
))
}
def soundex
(e
: Column
): Column
= withExpr
{ SoundEx
(e
.expr
) }
def split
(str
: Column
, pattern
: String): Column
= withExpr
{
StringSplit
(str
.expr
, Literal
(pattern
), Literal
(-1))
}
def split
(str
: Column
, pattern
: String, limit
: Int): Column
= withExpr
{
StringSplit
(str
.expr
, Literal
(pattern
), Literal
(limit
))
}
def substring
(str
: Column
, pos
: Int, len
: Int): Column
= withExpr
{
Substring
(str
.expr
, lit
(pos
).expr
, lit
(len
).expr
)
}
def substring_index
(str
: Column
, delim
: String, count
: Int): Column
= withExpr
{
SubstringIndex
(str
.expr
, lit
(delim
).expr
, lit
(count
).expr
)
}
def overlay
(src
: Column
, replace
: Column
, pos
: Column
, len
: Column
): Column
= withExpr
{
Overlay
(src
.expr
, replace
.expr
, pos
.expr
, len
.expr
)
}
def overlay
(src
: Column
, replace
: Column
, pos
: Column
): Column
= withExpr
{
new Overlay
(src
.expr
, replace
.expr
, pos
.expr
)
}
def translate
(src
: Column
, matchingString
: String, replaceString
: String): Column
= withExpr
{
StringTranslate
(src
.expr
, lit
(matchingString
).expr
, lit
(replaceString
).expr
)
}
def trim
(e
: Column
): Column
= withExpr
{ StringTrim
(e
.expr
) }
def trim
(e
: Column
, trimString
: String): Column
= withExpr
{
StringTrim
(e
.expr
, Literal
(trimString
))
}
def upper
(e
: Column
): Column
= withExpr
{ Upper
(e
.expr
) }
7. DateTime functions
def add_months
(startDate
: Column
, numMonths
: Int): Column
= add_months
(startDate
, lit
(numMonths
))
def add_months
(startDate
: Column
, numMonths
: Column
): Column
= withExpr
{
AddMonths
(startDate
.expr
, numMonths
.expr
)
}
def current_date
(): Column
= withExpr
{ CurrentDate
() }
def current_timestamp
(): Column
= withExpr
{ CurrentTimestamp
() }
def date_format
(dateExpr
: Column
, format
: String): Column
= withExpr
{
DateFormatClass
(dateExpr
.expr
, Literal
(format
))
}
def date_add
(start
: Column
, days
: Int): Column
= date_add
(start
, lit
(days
))
def date_add
(start
: Column
, days
: Column
): Column
= withExpr
{ DateAdd
(start
.expr
, days
.expr
) }
def date_sub
(start
: Column
, days
: Int): Column
= date_sub
(start
, lit
(days
))
def date_sub
(start
: Column
, days
: Column
): Column
= withExpr
{ DateSub
(start
.expr
, days
.expr
) }
def datediff
(end
: Column
, start
: Column
): Column
= withExpr
{ DateDiff
(end
.expr
, start
.expr
) }
def year
(e
: Column
): Column
= withExpr
{ Year
(e
.expr
) }
def quarter
(e
: Column
): Column
= withExpr
{ Quarter
(e
.expr
) }
def month
(e
: Column
): Column
= withExpr
{ Month
(e
.expr
) }
def dayofweek
(e
: Column
): Column
= withExpr
{ DayOfWeek
(e
.expr
) }
def dayofmonth
(e
: Column
): Column
= withExpr
{ DayOfMonth
(e
.expr
) }
def dayofyear
(e
: Column
): Column
= withExpr
{ DayOfYear
(e
.expr
) }
def hour
(e
: Column
): Column
= withExpr
{ Hour
(e
.expr
) }
def last_day
(e
: Column
): Column
= withExpr
{ LastDay
(e
.expr
) }
def minute
(e
: Column
): Column
= withExpr
{ Minute
(e
.expr
) }
def months_between
(end
: Column
, start
: Column
): Column
= withExpr
{
new MonthsBetween
(end
.expr
, start
.expr
)
}
def months_between
(end
: Column
, start
: Column
, roundOff
: Boolean): Column
= withExpr
{
MonthsBetween
(end
.expr
, start
.expr
, lit
(roundOff
).expr
)
}
def next_day
(date
: Column
, dayOfWeek
: String): Column
= withExpr
{
NextDay
(date
.expr
, lit
(dayOfWeek
).expr
)
}
def second
(e
: Column
): Column
= withExpr
{ Second
(e
.expr
) }
def weekofyear
(e
: Column
): Column
= withExpr
{ WeekOfYear
(e
.expr
) }
def from_unixtime
(ut
: Column
): Column
= withExpr
{
FromUnixTime
(ut
.expr
, Literal
(TimestampFormatter
.defaultPattern
))
}
def from_unixtime
(ut
: Column
, f
: String): Column
= withExpr
{
FromUnixTime
(ut
.expr
, Literal
(f
))
}
def unix_timestamp
(): Column
= withExpr
{
UnixTimestamp
(CurrentTimestamp
(), Literal
(TimestampFormatter
.defaultPattern
))
}
def unix_timestamp
(s
: Column
): Column
= withExpr
{
UnixTimestamp
(s
.expr
, Literal
(TimestampFormatter
.defaultPattern
))
}
def unix_timestamp
(s
: Column
, p
: String): Column
= withExpr
{ UnixTimestamp
(s
.expr
, Literal
(p
)) }
def to_timestamp
(s
: Column
): Column
= withExpr
{
new ParseToTimestamp
(s
.expr
)
}
def to_timestamp
(s
: Column
, fmt
: String): Column
= withExpr
{
new ParseToTimestamp
(s
.expr
, Literal
(fmt
))
}
def to_date
(e
: Column
): Column
= withExpr
{ new ParseToDate
(e
.expr
) }
def to_date
(e
: Column
, fmt
: String): Column
= withExpr
{
new ParseToDate
(e
.expr
, Literal
(fmt
))
}
def trunc
(date
: Column
, format
: String): Column
= withExpr
{
TruncDate
(date
.expr
, Literal
(format
))
}
def date_trunc
(format
: String, timestamp
: Column
): Column
= withExpr
{
TruncTimestamp
(Literal
(format
), timestamp
.expr
)
}
def from_utc_timestamp
(ts
: Column
, tz
: String): Column
= withExpr
{
FromUTCTimestamp
(ts
.expr
, Literal
(tz
))
}
def from_utc_timestamp
(ts
: Column
, tz
: Column
): Column
= withExpr
{
FromUTCTimestamp
(ts
.expr
, tz
.expr
)
}
def to_utc_timestamp
(ts
: Column
, tz
: String): Column
= withExpr
{
ToUTCTimestamp
(ts
.expr
, Literal
(tz
))
}
def to_utc_timestamp
(ts
: Column
, tz
: Column
): Column
= withExpr
{
ToUTCTimestamp
(ts
.expr
, tz
.expr
)
}
8. Window函数
这是在DSL风格API中,开窗所使用的函数
def window
(
timeColumn
: Column
,
windowDuration
: String,
slideDuration
: String,
startTime
: String): Column
= {
withExpr
{
TimeWindow
(timeColumn
.expr
, windowDuration
, slideDuration
, startTime
)
}.as
("window")
}
def window
(timeColumn
: Column
, windowDuration
: String, slideDuration
: String): Column
= {
window
(timeColumn
, windowDuration
, slideDuration
, "0 second")
}
def window
(timeColumn
: Column
, windowDuration
: String): Column
= {
window
(timeColumn
, windowDuration
, windowDuration
, "0 second")
}
9. Collection functions
def array_contains
(column
: Column
, value
: Any): Column
= withExpr
{
ArrayContains
(column
.expr
, lit
(value
).expr
)
}
def arrays_overlap
(a1
: Column
, a2
: Column
): Column
= withExpr
{
ArraysOverlap
(a1
.expr
, a2
.expr
)
}
def slice
(x
: Column
, start
: Int, length
: Int): Column
= withExpr
{
Slice
(x
.expr
, Literal
(start
), Literal
(length
))
}
def array_join
(column
: Column
, delimiter
: String, nullReplacement
: String): Column
= withExpr
{
ArrayJoin
(column
.expr
, Literal
(delimiter
), Some
(Literal
(nullReplacement
)))
}
def array_join
(column
: Column
, delimiter
: String): Column
= withExpr
{
ArrayJoin
(column
.expr
, Literal
(delimiter
), None
)
}
@scala.annotation
.varargs
def concat
(exprs
: Column
*): Column
= withExpr
{ Concat
(exprs
.map
(_
.expr
)) }
def array_position
(column
: Column
, value
: Any): Column
= withExpr
{
ArrayPosition
(column
.expr
, lit
(value
).expr
)
}
def element_at
(column
: Column
, value
: Any): Column
= withExpr
{
ElementAt
(column
.expr
, lit
(value
).expr
)
}
def array_sort
(e
: Column
): Column
= withExpr
{ new ArraySort
(e
.expr
) }
def array_remove
(column
: Column
, element
: Any): Column
= withExpr
{
ArrayRemove
(column
.expr
, lit
(element
).expr
)
}
def array_distinct
(e
: Column
): Column
= withExpr
{ ArrayDistinct
(e
.expr
) }
def array_intersect
(col1
: Column
, col2
: Column
): Column
= withExpr
{
ArrayIntersect
(col1
.expr
, col2
.expr
)
}
def array_union
(col1
: Column
, col2
: Column
): Column
= withExpr
{
ArrayUnion
(col1
.expr
, col2
.expr
)
}
def array_except
(col1
: Column
, col2
: Column
): Column
= withExpr
{
ArrayExcept
(col1
.expr
, col2
.expr
)
}
private def createLambda
(f
: Column
=> Column
) = {
val x
= UnresolvedNamedLambdaVariable
(Seq
("x"))
val function
= f
(Column
(x
)).expr
LambdaFunction
(function
, Seq
(x
))
}
private def createLambda
(f
: (Column
, Column
) => Column
) = {
val x
= UnresolvedNamedLambdaVariable
(Seq
("x"))
val y
= UnresolvedNamedLambdaVariable
(Seq
("y"))
val function
= f
(Column
(x
), Column
(y
)).expr
LambdaFunction
(function
, Seq
(x
, y
))
}
private def createLambda
(f
: (Column
, Column
, Column
) => Column
) = {
val x
= UnresolvedNamedLambdaVariable
(Seq
("x"))
val y
= UnresolvedNamedLambdaVariable
(Seq
("y"))
val z
= UnresolvedNamedLambdaVariable
(Seq
("z"))
val function
= f
(Column
(x
), Column
(y
), Column
(z
)).expr
LambdaFunction
(function
, Seq
(x
, y
, z
))
}
def transform
(column
: Column
, f
: Column
=> Column
): Column
= withExpr
{
ArrayTransform
(column
.expr
, createLambda
(f
))
}
def transform
(column
: Column
, f
: (Column
, Column
) => Column
): Column
= withExpr
{
ArrayTransform
(column
.expr
, createLambda
(f
))
}
def exists
(column
: Column
, f
: Column
=> Column
): Column
= withExpr
{
ArrayExists
(column
.expr
, createLambda
(f
))
}
def forall
(column
: Column
, f
: Column
=> Column
): Column
= withExpr
{
ArrayForAll
(column
.expr
, createLambda
(f
))
}
def filter
(column
: Column
, f
: Column
=> Column
): Column
= withExpr
{
ArrayFilter
(column
.expr
, createLambda
(f
))
}
def filter
(column
: Column
, f
: (Column
, Column
) => Column
): Column
= withExpr
{
ArrayFilter
(column
.expr
, createLambda
(f
))
}
def aggregate
(
expr
: Column
,
initialValue
: Column
,
merge
: (Column
, Column
) => Column
,
finish
: Column
=> Column
): Column
= withExpr
{
ArrayAggregate
(
expr
.expr
,
initialValue
.expr
,
createLambda
(merge
),
createLambda
(finish
)
)
}
def aggregate
(expr
: Column
, initialValue
: Column
, merge
: (Column
, Column
) => Column
): Column
=
aggregate
(expr
, initialValue
, merge
, c
=> c
)
def zip_with
(left
: Column
, right
: Column
, f
: (Column
, Column
) => Column
): Column
= withExpr
{
ZipWith
(left
.expr
, right
.expr
, createLambda
(f
))
}
def transform_keys
(expr
: Column
, f
: (Column
, Column
) => Column
): Column
= withExpr
{
TransformKeys
(expr
.expr
, createLambda
(f
))
}
def transform_values
(expr
: Column
, f
: (Column
, Column
) => Column
): Column
= withExpr
{
TransformValues
(expr
.expr
, createLambda
(f
))
}
def map_filter
(expr
: Column
, f
: (Column
, Column
) => Column
): Column
= withExpr
{
MapFilter
(expr
.expr
, createLambda
(f
))
}
def map_zip_with
(
left
: Column
,
right
: Column
,
f
: (Column
, Column
, Column
) => Column
): Column
= withExpr
{
MapZipWith
(left
.expr
, right
.expr
, createLambda
(f
))
}
def explode
(e
: Column
): Column
= withExpr
{ Explode
(e
.expr
) }
def explode_outer
(e
: Column
): Column
= withExpr
{ GeneratorOuter
(Explode
(e
.expr
)) }
def posexplode
(e
: Column
): Column
= withExpr
{ PosExplode
(e
.expr
) }
def posexplode_outer
(e
: Column
): Column
= withExpr
{ GeneratorOuter
(PosExplode
(e
.expr
)) }
def get_json_object
(e
: Column
, path
: String): Column
= withExpr
{
GetJsonObject
(e
.expr
, lit
(path
).expr
)
}
@scala.annotation
.varargs
def json_tuple
(json
: Column
, fields
: String*): Column
= withExpr
{
require
(fields
.nonEmpty
, "at least 1 field name should be given.")
JsonTuple
(json
.expr
+: fields
.map
(Literal
.apply
))
}
def from_json
(e
: Column
, schema
: StructType
, options
: Map
[String, String]): Column
=
from_json
(e
, schema
.asInstanceOf
[DataType
], options
)
def from_json
(e
: Column
, schema
: DataType
, options
: Map
[String, String]): Column
= withExpr
{
JsonToStructs
(schema
, options
, e
.expr
)
}
def from_json
(e
: Column
, schema
: StructType
, options
: java
.util
.Map
[String, String]): Column
=
from_json
(e
, schema
, options
.asScala
.toMap
)
def from_json
(e
: Column
, schema
: DataType
, options
: java
.util
.Map
[String, String]): Column
=
from_json
(e
, schema
, options
.asScala
.toMap
)
def from_json
(e
: Column
, schema
: StructType
): Column
=
from_json
(e
, schema
, Map
.empty
[String, String])
def from_json
(e
: Column
, schema
: DataType
): Column
=
from_json
(e
, schema
, Map
.empty
[String, String])
def from_json
(e
: Column
, schema
: String, options
: java
.util
.Map
[String, String]): Column
= {
from_json
(e
, schema
, options
.asScala
.toMap
)
}
def from_json
(e
: Column
, schema
: String, options
: Map
[String, String]): Column
= {
val dataType
= try {
DataType
.fromJson
(schema
)
} catch {
case NonFatal
(_
) => DataType
.fromDDL
(schema
)
}
from_json
(e
, dataType
, options
)
}
def from_json
(e
: Column
, schema
: Column
): Column
= {
from_json
(e
, schema
, Map
.empty
[String, String].asJava
)
}
def from_json
(e
: Column
, schema
: Column
, options
: java
.util
.Map
[String, String]): Column
= {
withExpr
(new JsonToStructs
(e
.expr
, schema
.expr
, options
.asScala
.toMap
))
}
def schema_of_json
(json
: String): Column
= schema_of_json
(lit
(json
))
def schema_of_json
(json
: Column
): Column
= withExpr
(new SchemaOfJson
(json
.expr
))
def schema_of_json
(json
: Column
, options
: java
.util
.Map
[String, String]): Column
= {
withExpr
(SchemaOfJson
(json
.expr
, options
.asScala
.toMap
))
}
def to_json
(e
: Column
, options
: Map
[String, String]): Column
= withExpr
{
StructsToJson
(options
, e
.expr
)
}
def to_json
(e
: Column
, options
: java
.util
.Map
[String, String]): Column
=
to_json
(e
, options
.asScala
.toMap
)
def to_json
(e
: Column
): Column
=
to_json
(e
, Map
.empty
[String, String])
def size
(e
: Column
): Column
= withExpr
{ Size
(e
.expr
) }
def sort_array
(e
: Column
): Column
= sort_array
(e
, asc
= true)
def sort_array
(e
: Column
, asc
: Boolean): Column
= withExpr
{ SortArray
(e
.expr
, lit
(asc
).expr
) }
def array_min
(e
: Column
): Column
= withExpr
{ ArrayMin
(e
.expr
) }
def array_max
(e
: Column
): Column
= withExpr
{ ArrayMax
(e
.expr
) }
def shuffle
(e
: Column
): Column
= withExpr
{ Shuffle
(e
.expr
) }
def reverse
(e
: Column
): Column
= withExpr
{ Reverse
(e
.expr
) }
def flatten
(e
: Column
): Column
= withExpr
{ Flatten
(e
.expr
) }
def sequence
(start
: Column
, stop
: Column
, step
: Column
): Column
= withExpr
{
new Sequence
(start
.expr
, stop
.expr
, step
.expr
)
}
def sequence
(start
: Column
, stop
: Column
): Column
= withExpr
{
new Sequence
(start
.expr
, stop
.expr
)
}
def array_repeat
(left
: Column
, right
: Column
): Column
= withExpr
{
ArrayRepeat
(left
.expr
, right
.expr
)
}
def array_repeat
(e
: Column
, count
: Int): Column
= array_repeat
(e
, lit
(count
))
def map_keys
(e
: Column
): Column
= withExpr
{ MapKeys
(e
.expr
) }
def map_values
(e
: Column
): Column
= withExpr
{ MapValues
(e
.expr
) }
def map_entries
(e
: Column
): Column
= withExpr
{ MapEntries
(e
.expr
) }
def map_from_entries
(e
: Column
): Column
= withExpr
{ MapFromEntries
(e
.expr
) }
@scala.annotation
.varargs
def arrays_zip
(e
: Column
*): Column
= withExpr
{ ArraysZip
(e
.map
(_
.expr
)) }
@scala.annotation
.varargs
def map_concat
(cols
: Column
*): Column
= withExpr
{ MapConcat
(cols
.map
(_
.expr
)) }
def from_csv
(e
: Column
, schema
: StructType
, options
: Map
[String, String]): Column
= withExpr
{
CsvToStructs
(schema
, options
, e
.expr
)
}
def from_csv
(e
: Column
, schema
: Column
, options
: java
.util
.Map
[String, String]): Column
= {
withExpr
(new CsvToStructs
(e
.expr
, schema
.expr
, options
.asScala
.toMap
))
}
def schema_of_csv
(csv
: String): Column
= schema_of_csv
(lit
(csv
))
def schema_of_csv
(csv
: Column
): Column
= withExpr
(new SchemaOfCsv
(csv
.expr
))
def schema_of_csv
(csv
: Column
, options
: java
.util
.Map
[String, String]): Column
= {
withExpr
(SchemaOfCsv
(csv
.expr
, options
.asScala
.toMap
))
}
def to_csv
(e
: Column
, options
: java
.util
.Map
[String, String]): Column
= withExpr
{
StructsToCsv
(options
.asScala
.toMap
, e
.expr
)
}
def to_csv
(e
: Column
): Column
= to_csv
(e
, Map
.empty
[String, String].asJava
)
def years
(e
: Column
): Column
= withExpr
{ Years
(e
.expr
) }
def months
(e
: Column
): Column
= withExpr
{ Months
(e
.expr
) }
def days
(e
: Column
): Column
= withExpr
{ Days
(e
.expr
) }
def hours
(e
: Column
): Column
= withExpr
{ Hours
(e
.expr
) }
def bucket
(numBuckets
: Column
, e
: Column
): Column
= withExpr
{
numBuckets
.expr
match {
case lit @ Literal
(_
, IntegerType
) =>
Bucket
(lit
, e
.expr
)
case _
=>
throw new AnalysisException
(s
"Invalid number of buckets: bucket($numBuckets, $e)")
}
}
def bucket
(numBuckets
: Int, e
: Column
): Column
= withExpr
{
Bucket
(Literal
(numBuckets
), e
.expr
)
}
|def udf
[$typeTags
](f
: Function$x
[$types
]): UserDefinedFunction
= {
| val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
| val inputEncoders
= $inputEncoders
| val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
| if (nullable
) udf
else udf
.asNonNullable
()
|}""".stripMargin)
}
(0 to 10).foreach { i =>
val extTypeArgs = (0 to i).map(_ => "_").mkString(", ")
val anyTypeArgs = (0 to i).map(_ => "Any").mkString(", ")
val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs]]"
val anyParams = (1 to i).map(_ => "_: Any").mkString(", ")
val funcCall = if (i == 0) s"() => f$anyCast.call($anyParams)" else s"f$anyCast.call($anyParams)"
println(s"""
|
|def udf
(f
: UDF$i
[$extTypeArgs
], returnType
: DataType
): UserDefinedFunction
= {
| val func
= $funcCall
| SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
($i
)(None
))
|}"""
.stripMargin
)
}
*/
10. Scala UDF functions
def udaf
[IN
: TypeTag
, BUF
, OUT
](agg
: Aggregator
[IN
, BUF
, OUT
]): UserDefinedFunction
= {
udaf
(agg
, ExpressionEncoder
[IN
]())
}
def udaf
[IN
, BUF
, OUT
](
agg
: Aggregator
[IN
, BUF
, OUT
],
inputEncoder
: Encoder
[IN
]): UserDefinedFunction
= {
UserDefinedAggregator
(agg
, inputEncoder
)
}
def udf
[RT
: TypeTag
](f
: Function0
[RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
](f
: Function1
[A1
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
](f
: Function2
[A1
, A2
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
](f
: Function3
[A1
, A2
, A3
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
](f
: Function4
[A1
, A2
, A3
, A4
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
, A5
: TypeTag
](f
: Function5
[A1
, A2
, A3
, A4
, A5
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Try
(ExpressionEncoder
[A5
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
, A5
: TypeTag
, A6
: TypeTag
](f
: Function6
[A1
, A2
, A3
, A4
, A5
, A6
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Try
(ExpressionEncoder
[A5
]()).toOption
:: Try
(ExpressionEncoder
[A6
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
, A5
: TypeTag
, A6
: TypeTag
, A7
: TypeTag
](f
: Function7
[A1
, A2
, A3
, A4
, A5
, A6
, A7
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Try
(ExpressionEncoder
[A5
]()).toOption
:: Try
(ExpressionEncoder
[A6
]()).toOption
:: Try
(ExpressionEncoder
[A7
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
, A5
: TypeTag
, A6
: TypeTag
, A7
: TypeTag
, A8
: TypeTag
](f
: Function8
[A1
, A2
, A3
, A4
, A5
, A6
, A7
, A8
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Try
(ExpressionEncoder
[A5
]()).toOption
:: Try
(ExpressionEncoder
[A6
]()).toOption
:: Try
(ExpressionEncoder
[A7
]()).toOption
:: Try
(ExpressionEncoder
[A8
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
, A5
: TypeTag
, A6
: TypeTag
, A7
: TypeTag
, A8
: TypeTag
, A9
: TypeTag
](f
: Function9
[A1
, A2
, A3
, A4
, A5
, A6
, A7
, A8
, A9
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Try
(ExpressionEncoder
[A5
]()).toOption
:: Try
(ExpressionEncoder
[A6
]()).toOption
:: Try
(ExpressionEncoder
[A7
]()).toOption
:: Try
(ExpressionEncoder
[A8
]()).toOption
:: Try
(ExpressionEncoder
[A9
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
def udf
[RT
: TypeTag
, A1
: TypeTag
, A2
: TypeTag
, A3
: TypeTag
, A4
: TypeTag
, A5
: TypeTag
, A6
: TypeTag
, A7
: TypeTag
, A8
: TypeTag
, A9
: TypeTag
, A10
: TypeTag
](f
: Function10
[A1
, A2
, A3
, A4
, A5
, A6
, A7
, A8
, A9
, A10
, RT
]): UserDefinedFunction
= {
val ScalaReflection
.Schema
(dataType
, nullable
) = ScalaReflection
.schemaFor
[RT
]
val inputEncoders
= Try
(ExpressionEncoder
[A1
]()).toOption
:: Try
(ExpressionEncoder
[A2
]()).toOption
:: Try
(ExpressionEncoder
[A3
]()).toOption
:: Try
(ExpressionEncoder
[A4
]()).toOption
:: Try
(ExpressionEncoder
[A5
]()).toOption
:: Try
(ExpressionEncoder
[A6
]()).toOption
:: Try
(ExpressionEncoder
[A7
]()).toOption
:: Try
(ExpressionEncoder
[A8
]()).toOption
:: Try
(ExpressionEncoder
[A9
]()).toOption
:: Try
(ExpressionEncoder
[A10
]()).toOption
:: Nil
val udf
= SparkUserDefinedFunction
(f
, dataType
, inputEncoders
)
if (nullable
) udf
else udf
.asNonNullable
()
}
11.Java UDF functions
def udf
(f
: UDF0
[_
], returnType
: DataType
): UserDefinedFunction
= {
val func
= () => f
.asInstanceOf
[UDF0
[Any]].call
()
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(0)(None
))
}
def udf
(f
: UDF1
[_
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF1
[Any, Any]].call
(_
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(1)(None
))
}
def udf
(f
: UDF2
[_
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF2
[Any, Any, Any]].call
(_
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(2)(None
))
}
def udf
(f
: UDF3
[_
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF3
[Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(3)(None
))
}
def udf
(f
: UDF4
[_
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF4
[Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(4)(None
))
}
def udf
(f
: UDF5
[_
, _
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF5
[Any, Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(5)(None
))
}
def udf
(f
: UDF6
[_
, _
, _
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF6
[Any, Any, Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(6)(None
))
}
def udf
(f
: UDF7
[_
, _
, _
, _
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF7
[Any, Any, Any, Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(7)(None
))
}
def udf
(f
: UDF8
[_
, _
, _
, _
, _
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF8
[Any, Any, Any, Any, Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(8)(None
))
}
def udf
(f
: UDF9
[_
, _
, _
, _
, _
, _
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF9
[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(9)(None
))
}
def udf
(f
: UDF10
[_
, _
, _
, _
, _
, _
, _
, _
, _
, _
, _
], returnType
: DataType
): UserDefinedFunction
= {
val func
= f
.asInstanceOf
[UDF10
[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]].call
(_
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any, _
: Any)
SparkUserDefinedFunction
(func
, returnType
, inputEncoders
= Seq
.fill
(10)(None
))
}
@deprecated("Scala `udf` method with return type parameter is deprecated. " +
"Please use Scala `udf` method without return type parameter.", "3.0.0")
def udf
(f
: AnyRef, dataType
: DataType
): UserDefinedFunction
= {
if (!SQLConf
.get
.getConf
(SQLConf
.LEGACY_ALLOW_UNTYPED_SCALA_UDF
)) {
val errorMsg
= "You're using untyped Scala UDF, which does not have the input type " +
"information. Spark may blindly pass null to the Scala closure with primitive-type " +
"argument, and the closure will see the default value of the Java type for the null " +
"argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. " +
"To get rid of this error, you could:\n" +
"1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`\n" +
"2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " +
"override def call(s: String): Integer = s.length() }, IntegerType)`, " +
"if input types are all non primitive\n" +
s
"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " +
s
"use this API with caution"
throw new AnalysisException
(errorMsg
)
}
SparkUserDefinedFunction
(f
, dataType
, inputEncoders
= Nil
)
}
@scala.annotation
.varargs
def callUDF
(udfName
: String, cols
: Column
*): Column
= withExpr
{
UnresolvedFunction
(udfName
, cols
.map
(_
.expr
), isDistinct
= false)
}
12. 总结
这些函数都是给Spark SQL中dataframe和dataset使用的这些函数包括SQL风格和DSL风格API的使用支持范围这些函数与Spark SQL中的部分函数是重合的,如果对Spark SQL中函数有疑问,可以查看这里的源码