spark算子、DF、hive、hbase操作

    科技2025-10-08  2

    一、rdd,df使用。hive,hbase表建立。

    –使用rdd和sparkSQL业务查询 sparkSQL创建datafrom: 1.数据准备 请在 HDFS 中创建目录/app/data/exam,并将 meituan_waimai_meishi.csv 文件传到该 目录。 –通过 HDFS 命令查询出文档有多少行数据。 hdfs dfs -cat /app/data/exam/meituan_waimai_meishi.csv | wc -l 2.使用 Spark,加载 HDFS 文件系统 meituan_waimai_meishi.csv 文件,并分别使用 RDD 和 Spark SQL 完成以下分析,不考虑数据去重。 –加载数据使用rdd。 val rdd=sc.textFile(“hdfs:///app/data/exam/meituan_waimai_meishi.csv”) –去表头 val rdd1=rdd.filter(x=>(!x.startsWith(“spu_id”))) –过滤脏数据,按",“分隔 val rdd2=rdd1.map(.split(",",-1)).filter(x=>x.size==12) –①统计每个店铺分别有多少商品(SPU)。 rdd2.map(x=>(x(2),1)).reduceByKey(+).collect –②统计每个店铺的总销售额。 rdd2.map(x=>(x(2),x(5).toDouble * x(7).toDouble)).reduceByKey(+_).collect rdd2.map(x=>(x(2),x(5).toDouble * x(7).toDouble)).groupBy(x=>(x._1)).map(x=>(x._1,x._2.map(x=>x._2))).map(x=> (x._1,x._2.sum)).collect.foreach(println) –③统计每个店铺销售额最高的前三个商品,输出内容包括店铺名,商品名和销售额,其 –中销售额为 0 的商品不进行统计计算,例如:如果某个店铺销售为 0,则不进行统计。 rdd2.map(x=>(x(2),x(4),x(5).toDouble * x(7).toDouble)).filter(x=>x._3!=0).groupBy(x=>x._1).map(x=>{ val shop_name=x._1; val res=x.2.toList.sortBy(-1*._3).take(3).map(item=>{item._2+”,"+item._3}); (shop_name,res)}).collect.foreach(println)

    使用df处理 –转成df,且数据类型没有全部是string(.option(“inferSchema”,“true”)可以是数据类型不变成string) val df=spark.read.format(“csv”).option(“header”,true).option(“inferSchema”,“true”) .load(“hdfs:///app/data/exam/meituan_waimai_meishi.csv”) –将一列的数据类型转成其他的 df.withColumn(“spu_price”,$“spu_price”.cast(DataTypes.DoubleType)) –导入SPARKSQL所需要的包 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ –创建表 df.createOrReplaceTempView(“spu”) –查询语句 1、spark.sql(“select shop_name,count(1) from spu group by shop_name”).show 2、spark.sql("select shop_name,sum(spu_pricemonth_sales) as xiaoshou from spu group by shop_name").show 3、select t.shop_name,t.spu_name,t.sales,t.rn from(select shop_name,spu_name,spu_pricemonth_sales as sales,row_number() over(partition by shop_name order by spu_price*month_sales desc) as rn from spu where month_sales!=0) t where t.rn<=3

    创建 HBase 数据表 在 HBase 中创建命名空间(namespace)exam,在该命名空间下创建 spu 表,该表下有1 个列族 result。 create_namespace ‘exam202010’ create ‘exam202010:spu’,‘result’

    请 在 Hive 中 创 建 数 据 库 spu_db , 在 该 数 据 库 中 创 建 外 部 表 ex_spu 指 向 /app/data/exam 下的测试数据 ; create database spu_db;

    create external table if not exists ex_spu( spu_id string, shop_id string, shop_name string, category_name string, spu_name string, spu_price double, spu_originprice double, month_sales int, praise_num int, spu_unit string, spu_desc string, spu_image string ) row format delimited fields terminated by ‘,’ stored as textfile location ‘/app/data/exam/’ tblproperties(“skip.header.line.count”=“1”);

    创建外部表 ex_spu_hbase 映射至 HBase 中的 exam:spu 表的 result 列族 create external table if not exists ex_spu_hbase( key string, sales double, praise int) stored by ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’ with serdeproperties(“hbase.columns.mapping”=":key,result:sales,result:praise") tblproperties(“hbase.table.name”=“exam202010:spu”);

    统计每个店铺的总销售额 sales, 店铺的商品总点赞数 praise,并将 shop_id 和shop_name 的组合作为 RowKey,并将结果映射到 HBase。 insert into ex_spu_hbase select concat(t.shop_id,t.shop_name) as key,t.money,t.praise from (select shop_id,shop_name,sum(spu_price*month_sales) as money,sum(praise_num) as praise from ex_spu group by shop_id,shop_name) t;

    完成统计后,分别在 hive 和 HBase 中查询结果数据。 select * from ex_spu_hbase; scan ‘exam202010:spu’,{LIMIT=>10}

    二、建立hive表,将hbase里的表数据映射至hive中表一,再创建hive表表二,将表一数据插入到表二中。

    – check if users table exists DROP TABLE IF EXISTS ${db}.hb_users;

    – create users table。建立hive表,映射hbase。 CREATE EXTERNAL TABLE ${db}.hb_users(user_id STRING, birth_year INT, gender STRING, locale STRING, location STRING, time_zone STRING, joined_at STRING) STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’ WITH SERDEPROPERTIES (‘hbase.columns.mapping’ = ‘:key, profile:birth_year, profile:gender, region:locale, region:location, region:time_zone, registration:joined_at’) TBLPROPERTIES (‘hbase.table.name’ = ‘events_db:users’);

    – check if users table exists DROP TABLE IF EXISTS ${db}.users;

    – create table users。导入映射的表数据到新表中。 CREATE TABLE ${db}.users STORED AS ORC AS SELECT * FROM ${db}.hb_users;

    – remove the temp table DROP TABLE IF EXISTS ${db}.hb_users;

    Processed: 0.009, SQL: 8