项目结构:
1. 访问的pv
package com
.erainm
import org
.apache
.spark
.rdd
.RDD
import org
.apache
.spark
.{SparkConf
, SparkContext
}
object PV
{
def main
(args
: Array
[String]): Unit = {
val sc
: SparkContext
= new SparkContext
(new SparkConf
().setAppName
("PV").setMaster
("local[2]"))
val dataRDD
: RDD
[String] = sc
.textFile
("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/access.log")
val pvAndOne
: RDD
[(String, Int)] = dataRDD
.map
(x
=> ("PV", 1))
val totalPV
: RDD
[(String, Int)] = pvAndOne
.reduceByKey
(_
+ _
)
totalPV
.foreach
(println
)
sc
.stop
()
}
}
2. 访问的uv
package com
.erainm
import org
.apache
.spark
.{SparkConf
, SparkContext
}
import org
.apache
.spark
.rdd
.RDD
object UV
{
def main
(args
: Array
[String]): Unit = {
val sc
: SparkContext
= new SparkContext
(new SparkConf
().setAppName
("UV").setMaster
("local[2]"))
val dataRDD
: RDD
[String] = sc
.textFile
("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/access.log")
val ipsRDD
: RDD
[(String)] = dataRDD
.map
(_
.split
(" ")).map
(x
=> x
(0))
val distinctUV
: RDD
[(String, Int)] = ipsRDD
.distinct
().map
(x
=> ("UV", 1))
val totalUVRDD
: RDD
[(String, Int)] = distinctUV
.reduceByKey
(_
+ _
)
totalUVRDD
.foreach
(println
(_
))
totalUVRDD
.saveAsTextFile
("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/out")
sc
.stop
()
}
}
3. 访问的topN
package com
.erainm
import org
.apache
.spark
.{SparkConf
, SparkContext
}
import org
.apache
.spark
.rdd
.RDD
object TopN
{
def main
(args
: Array
[String]): Unit = {
val sc
: SparkContext
= new SparkContext
(new SparkConf
().setAppName
("PV").setMaster
("local[2]"))
sc
.setLogLevel
("WARN")
val dataRDD
: RDD
[String] = sc
.textFile
("/Users/erainm/Documents/application/dev/workSpace/spark_parent/spark_parent/data/access.log")
val refUrlAndOne
: RDD
[(String, Int)] = dataRDD
.map
(_
.split
(" ")).filter
(_
.length
> 10).map
(x
=> (x
(10), 1))
val resultRDD
: RDD
[(String, Int)] = refUrlAndOne
.reduceByKey
(_
+ _
).sortBy
(_
._2
, false).filter
(x
=> x
._1
!= "\"-\"")
val finalResult
: Array
[(String, Int)] = resultRDD
.take
(5)
println
(finalResult
.toBuffer
)
sc
.stop
()
}
}
转载请注明原文地址:https://blackberry.8miu.com/read-3762.html