当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为RDD的一个元素。也可以将多个完整的文本文件一次性读取为一个pair RDD, 其中键是文件名,值是文件内容。 val input = sc.textFile("./README.md") 如果传递目录,则将目录下的所有文件读取作为RDD。 文件路径支持通配符。 通过wholeTextFiles()对于大量的小文件读取效率比较高,大文件效果没有那么高。 Spark通过saveAsTextFile() 进行文本文件的输出,该方法接收一个路径,并将 RDD 中的内容都输入到路径对应的文件中。Spark 将传入的路径作为目录对待,会在那个目录下输出多个文件。这样,Spark 就可以从多个节点上并行输出了。 result.saveAsTextFile(outputFile)
scala> sc.textFile("./README.md") res6: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[7] at textFile at <console>:25 scala> val readme = sc.textFile("./README.md") readme: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[9] at textFile at <console>:24 scala> readme.collect() res7: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster... scala> readme.saveAsTextFile("hdfs://node01:8020/test")如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
scala> import org.json4s._ import org.json4s._ scala> import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._ scala> import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization scala> var result = sc.textFile("examples/src/main/resources/people.json") result: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.json MapPartitionsRDD[7] at textFile at <console>:47 scala> implicit val formats = Serialization.formats(ShortTypeHints(List())) formats: org.json4s.Formats{val dateFormat: org.json4s.DateFormat; val typeHints: org.json4s.TypeHints} = org.json4s.Serialization$$anon$1@61f2c1da scala> result.collect() res3: Array[String] = Array({"name":"Michael"}, {"name":"Andy", "age":30}, {"name":"Justin", "age":19})如果JSON数据是跨行的,那么只能读入整个文件,然后对每个文件进行解析。 JSON数据的输出主要是通过在输出之前将由结构化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去。 说白了还是以文本文件的形式存,只是文本的格式已经在程序中转换为JSON。
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,然后通过将每一行进行解析实现对CSV的读取。 CSV/TSV数据的输出也是需要将结构化RDD通过相关的库转换成字符串RDD,然后使用 Spark 的文本文件 API 写出去。
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。 Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)。
scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee"))) data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24 scala> data.saveAsSequenceFile("hdfs://node01:8020/sequdata") scala> val sdata = sc.sequenceFile[Int,String]("hdfs://node01:8020/sequdata/*") sdata: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at sequenceFile at <console>:24 scala> sdata.collect() res14: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))可以直接调用 saveAsSequenceFile(path) 保存你的PairRDD,它会帮你写出数据。需要键和值能够自动转为Writable类型。
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee"))) data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24 scala> data.saveAsObjectFile("hdfs://node01:8020/objfile") scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD scala> val objrdd =sc.objectFile[(Int,String)]("hdfs://node01:8020/objfile/p*") objrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[28] at objectFile at <console>:25 scala> objrdd.collect() res20: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.
输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)键类型: 指定[K,V]键值对中K的类型值类型: 指定[K,V]键值对中V的类型分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits。 其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值兼容旧版本HadoopAPI的创建操作
文件路径输入格式键类型值类型分区值textFile(path: String, minPartitions: Int = defaultMinPartitions)pathTextInputFormatLongWritableTextminSplitshadoopFile[K, V, F <: InputFormat[K, V]](path: String, minPartitions: Int)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]pathFKVminSplitshadoopFile[K, V, F <: [K, V]](path: String)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]pathFKVDefaultMinSplitshadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]pathinputFormatClasskeyClassvalueClassdefaultMinPartitionshadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]n/ainputFormatClasskeyClassvalueClassdefaultMinPartitionssequenceFile[K, V](path: String, minPartitions: Int = defaultMinPartitions)(implicit km: ClassTag[K], vm: ClassTag[V], kcf: () ⇒ WritableConverter[K], vcf: () ⇒ WritableConverter[V]): RDD[(K, V)]pathSequenceFileInputFormat[K,V]KVdefaultMinPartitionsobjectFile[T](path: String, minPartitions: Int = defaultMinPartitions)(implicit arg0: ClassTag[T]): RDD[T]pathSequenceFileInputFormat[NullWritable,BytesWritable]NullWritableBytesWritableminSplits兼容新版本HadoopAPI的创建操作
文件路径输入格式键类型值类型分区值newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]pathFKVn/anewAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]pathFKVn/anewAPIHadoopRDD[K, V, F <: InputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]n/aFKVn/a注意:
在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了.读取示例:
scala> import org.apache.hadoop.io._ import org.apache.hadoop.io._ scala> val data = sc.parallelize(Array((30,"hadoop"), (71,"hive"), (11,"cat"))) data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at parallelize at <console>:35 scala> data.saveAsNewAPIHadoopFile("hdfs://node01:8020/output4/",classOf[LongWritable] ,classOf[Text] ,classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[LongWritable, Text]])对于RDD最后的归宿除了返回为集合和标量,也可以将RDD存储到外部文件系统或者数据库中,Spark系统与Hadoop是完全兼容的,所以MapReduce所支持的读写文件或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套API. 将RDD保存到HDFS中在通常情况下需要关注或者设置五个参数,即文件保存的路径,key值的class类型,Value值的class类型,RDD的输出格式(OutputFormat,如TextOutputFormat/SequenceFileOutputFormat),以及最后一个相关的参数codec(这个参数表示压缩存储的压缩形式,如DefaultCodec,Gzip,Codec等等)
兼容旧版API
saveAsObjectFile(path: String): UnitsaveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): UnitsaveAsTextFile(path: String): UnitsaveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): UnitsaveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): UnitsaveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[_, ]], codec: Class[ <: CompressionCodec]): UnitsaveAsHadoopDataset(conf: JobConf): Unit这里列出的API,前面6个都是saveAsHadoopDataset的简易实现版本,仅仅支持将RDD存储到HDFS中,而saveAsHadoopDataset的参数类型是JobConf,所以其不仅能够将RDD存储到HDFS中,也可以将RDD存储到其他数据库中,如Hbase,MangoDB,Cassandra等.
兼容新版API
saveAsNewAPIHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): UnitsaveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): UnitsaveAsNewAPIHadoopDataset(conf: Configuration): Unit同样的,前2个API是saveAsNewAPIHadoopDataset的简易实现,只能将RDD存到HDFS中,而saveAsNewAPIHadoopDataset比较灵活.新版的API没有codec的参数,所以要压缩存储文件到HDFS中每需要使用hadoopConfiguration参数,设置对应mapreduce.map.output.compress.codec参数和mapreduce.map.output.compress参数. 注意: 1.如果不知道怎么将RDD存储到Hadoop生态的系统中,主要上网搜索一下对应的map-reduce是怎么将数据存储进去的,然后改写成对应的saveAsHadoopDataset或saveAsNewAPIHadoopDataset就可以了.
写入示例:
scala> val read = sc.newAPIHadoopFile[LongWritable, Text, org.apache.hadoop.mapreduce.lib.input.TextInputFormat]("hdfs://node01:8020/output4/part*", classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat], classOf[LongWritable], classOf[Text]) read: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = hdfs://node01:8020/output4/part* NewHadoopRDD[48] at newAPIHadoopFile at <console>:35 scala> read.map{case (k, v) => v.toString}.collect res44: Array[String] = Array(30 hadoop, 71 hive, 11 cat)Spark 支持读写很多种文件系统, 像本地文件系统、Amazon S3、HDFS等。
实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。
调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。
建数据库和表语句
CREATE DATABASE bigdata CHARACTER SET utf8; CREATE TABLE `t_student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8; SELECT * FROM t_student支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:
Mysql读取:
def main (args: Array[String] ) { val sparkConf = new SparkConf ().setMaster ("local[2]").setAppName ("JdbcApp") val sc = new SparkContext (sparkConf) val rdd = new org.apache.spark.rdd.JdbcRDD ( sc, () => { Class.forName ("com.mysql.jdbc.Driver").newInstance() java.sql.DriverManager.getConnection ("jdbc:mysql://localhost:3306/rdd", "root", "hive") }, "select * from rddtable where id >= ? and id <= ?;", 1, 10, 1, r => (r.getInt(1), r.getString(2))) println (rdd.count () ) rdd.foreach (println (_) ) sc.stop () }Mysql写入:
def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp") val sc = new SparkContext(sparkConf) val data = sc.parallelize(List("Female", "Male","Female")) data.foreachPartition(insertData) } def insertData(iterator: Iterator[String]): Unit = { Class.forName ("com.mysql.jdbc.Driver").newInstance() //将数据存入到MySQL //获取连接 val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/rdd", "root", "admin") iterator.foreach(data => { //将每一条数据存入到MySQL val ps = conn.prepareStatement("insert into rddtable(name) values (?)") ps.setString(1, data) ps.executeUpdate() }) }JdbcRDD 接收这样几个参数。
首先,要提供一个用于对数据库创建连接的函数。这个函数让每个节点在连接必要的配置后创建自己读取数据的连接。接下来,要提供一个可以读取一定范围内数据的查询,以及查询参数中lowerBound和 upperBound 的值。这些参数可以让 Spark 在不同机器上查询不同范围的数据,这样就不会因尝试在一个节点上读取所有数据而遭遇性能瓶颈。这个函数的最后一个参数是一个可以将输出结果从转为对操作数据有用的格式的函数。如果这个参数空缺,Spark会自动将每行结果转为一个对象数组。Cassandra数据库和ElasticSearch集成:
由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.Result。 Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。 Maven如下:
<hbase.version>1.2.0-cdh5.16.2</hbase.version> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency>回顾MapReduce向HBase表中写入数据,使用TableReducer,其中OutputFormat为TableOutputFormat,读取数据Key:ImmutableBytesWritable,Value:Put。
写入数据时,需要将RDD转换为RDD[(ImmutableBytesWritable, Put)]类型,调用saveAsNewAPIHadoopFile方法数据保存至HBase表中。
HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。
范例演示:将词频统计结果保存HBase表,表的设计 首先创建表及查看表:
create 'htb_wordcount', 'info', 'count' describe htb_wordcount scan 'htb_wordcount' import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 将RDD数据保存至HBase表中 */ object SparkWriteHBase { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel("WARN") // TODO: 1、构建RDD val list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765)) val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2) // TODO: 2、将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value) // TODO: 组装RDD[(ImmutableBytesWritable, Put)] /** * HBase表的设计: * 表的名称:htb_wordcount * Rowkey: word * 列簇: info * 字段名称: count */ val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions{ iter => iter.map { case (word, count) => // 创建Put实例对象 val put = new Put(Bytes.toBytes(word)) // 添加列 put.addColumn( // 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组 Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString) ) // 返回二元组 (new ImmutableBytesWritable(put.getRow), put) } } // 构建HBase Client配置信息 val conf: Configuration = HBaseConfiguration.create() // 设置连接Zookeeper属性 conf.set("hbase.zookeeper.quorum", "node1.itcast.cn") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent", "/hbase") // 设置将数据保存的HBase表的名称 conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount") /* def saveAsNewAPIHadoopFile( path: String,// 保存的路径 keyClass: Class[_], // Key类型 valueClass: Class[_], // Value类型 outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现 conf: Configuration = self.context.hadoopConfiguration // 配置信息 ): Unit */ putsRDD.saveAsNewAPIHadoopFile( "datas/spark/htb-output-" + System.nanoTime(), // classOf[ImmutableBytesWritable], // classOf[Put], // classOf[TableOutputFormat[ImmutableBytesWritable]], // conf ) // 应用程序运行结束,关闭资源 sc.stop() } }运行完成以后,使用hbase shell查看数据:
回顾MapReduce从读HBase表中的数据,使用TableMapper,其中InputFormat为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result。 从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下: 此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示: 范例演示:
从HBase表读取词频统计结果,代码如下
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 从HBase 表中读取数据,封装到RDD数据集 */ object SparkReadHBase { def main(args: Array[String]): Unit = { // 创建应用程序入口SparkContext实例对象 val sc: SparkContext = { // 1.a 创建SparkConf对象,设置应用的配置信息 val sparkConf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .setMaster("local[2]") // TODO: 设置使用Kryo 序列化方式 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // TODO: 注册序列化的数据类型 .registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result])) // 1.b 传递SparkConf对象,构建Context实例 new SparkContext(sparkConf) } sc.setLogLevel("WARN") // TODO: a. 读取HBase Client 配置信息 val conf: Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "node1.itcast.cn") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent", "/hbase") // TODO: b. 设置读取的表的名称 conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount") /* def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V] ): RDD[(K, V)] */ val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( conf, // classOf[TableInputFormat], // classOf[ImmutableBytesWritable], // classOf[Result] // ) println(s"Count = ${resultRDD.count()}") resultRDD .take(5) .foreach { case (rowKey, result) => println(s"RowKey = ${Bytes.toString(rowKey.get())}") // HBase表中的每条数据封装在result对象中,解析获取每列的值 result.rawCells().foreach { cell => val cf = Bytes.toString(CellUtil.cloneFamily(cell)) val column = Bytes.toString(CellUtil.cloneQualifier(cell)) val value = Bytes.toString(CellUtil.cloneValue(cell)) val version = cell.getTimestamp println(s"\t $cf:$column = $value, version = $version") } } // 应用程序运行结束,关闭资源 sc.stop() } }运行结果:
HBase简单读取:
def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() //HBase中的表名 conf.set(TableInputFormat.INPUT_TABLE, "fruit") val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hBaseRDD.count() println("hBaseRDD RDD Count:"+ count) hBaseRDD.cache() hBaseRDD.foreach { case (_, result) => val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes)) val color = Bytes.toString(result.getValue("info".getBytes, "color".getBytes)) println("Row key:" + key + " Name:" + name + " Color:" + color) } sc.stop() }HBase简单写入:
def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark") val fruitTable = TableName.valueOf("fruit_spark") val tableDescr = new HTableDescriptor(fruitTable) tableDescr.addFamily(new HColumnDescriptor("info".getBytes)) val admin = new HBaseAdmin(conf) if (admin.tableExists(fruitTable)) { admin.disableTable(fruitTable) admin.deleteTable(fruitTable) } admin.createTable(tableDescr) def convert(triple: (Int, String, Int)) = { val put = new Put(Bytes.toBytes(triple._1)) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2)) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, put) } val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13))) val localData = initialRDD.map(convert) localData.saveAsHadoopDataset(jobConf) }