Spark
Spark RDD(弹性分布式数据集)
RDD(Resilient Distrbuted Dataset,弹性分布式数据集),是一个容错的、并行的 数据结构 ,可以让用户显式地将数据存储到磁盘和内存中, 并且还能控制分区。对于迭代式计算和交互式数据挖掘,RDD可以将中间计算的数据结构保存在内存,若是后面需要中间结果参与计算时,则可以直接从内存中读取,极高地提高了计算速度。
RDD五大特性
分区列表(a list of partitions)
每个RDD被分为多个分区,这些分区运行在不同节点,每个分区都会被计算任务处理。在RDD创建时,默认分区数量为该程序所分配的资源CPU核数(一个Core可以承载2~4个Partition),如果是从HDFS文件创建,默认为文件的 Block 数。每个分区都有一个计算函数(a function for computing each split)
Spark的RDD计算函数都是以分片为基本单位,每个RDD都会实现 compute 函数,对具体的分片进行计算依赖于其他RDD(a list of dependencies on other RDDs)
RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行计算。(K,V) 数据类型的RDD分类器(a partitioner for Key-Value RDDs)
当前Spark中实现了两种类型的分区函数基于哈希的 HashPartitioner
基于范围的 RangePartitioner
只有(K,V)的RDD才会有分区,非(K,V)的RDD,分区的指是 None.
每个分区数量都会有一个优先位置列表(a list of preferred locations to compute each split on)
优先位置列表会存储每个分区(Partition)的优先位置,对于一个HDFS文件来说,就是每个Parttion块的位置。“移动数据不如移动计算”的概念Spark进行任务调度的时候,会尽可能地将计算任务分配到其所有处理数据块的存储位置。
创建RDD
- 从文件系统加载数据创建RDD
1
2
3
4
5
6//1. 从linux本地文件系统加载数据创建RDD
scala> val test = sc.textFile("file:///export/data/test.txt")
//2. 从HDFS中加载数据创建RDD
// HDFS 上的 /data 目录下有一个 test.txt 文件
// 也可以改为 hdfs://localhost:9000/data/test.txt
scala> val test = sc.textFile("/data/test.txt") - 通过并行集合创建RDD
1
2scala> val array = Array(1,2,3,4,5)
scala> val arrRDD = sc.parallelize(array)
RDD转换算子
“转换” 操作主要用于根据已有RDD创建新的RDD,每一次通过 Transformation 算子计算后都会返回一个新的 RDD。
| 序号 | 转换算子 | 相关说明 |
|---|---|---|
| 1 | filter(func) | 筛选出满足函数的元素,并返回一个新的数据集 |
| 2 | map(func) | 将每个元素传递到函数中,返回结果是新的数据集 |
| 3 | flatMap(func) | 与map类似,但是每个输入都可以映射到0或者多个输出 |
| 4 | groupByKey() | 应用于Key-Value数据集时,返回一个新的(Key, Iterable {Value}) |
| 5 | reduceByKey(func) | 应用于Key-Value数据集时,返回一个新的Key-Value形式数据集。其中每个Value值是每个Key传递到函数func中进行聚合后的结果 |
- filter(func)
1
2
3scala> val test = sc.textFile("file:///export/data/test.txt")
// 排除“spark”以外的单词
scala> val linesWithspark = test.filter(line => line.contains("spark")) - map(func)
1
2
3scala> val test = sc.textFile("file:///export/data/test.txt")
// 将文本分成一个个单词
scala> val words = test.map(line => line.split(" ")) - flatMap(func)
1
2
3scala> val test = sc.textFile("file:///export/data/test.txt")
// 语句会先执行map(func),后执行 flat() 扁平化操作。
scala> val words = test.flatMap(line => line.split(" ")) - groupByKey()
1
2
3
4
5
6scala> val lines = sc.textFile("file:///export/data/test.txt")
// 得到 ("spark",1), ("spark",1), ("spark",1)
scala> val words = lines.flatMap(line => line.split(" ")).map(word => (word, 1))
// 合并相同key
// ("spark", (1,1,1))
scala> val groupWords = words.groupByKey() - reduceByKey(func)
1
2
3
4
5
6
7
8scala> val lines = sc.textFile("file:///export/data/test.txt")
scala> val words = lines.flatMap(line => line.split(" ")).map(word => (word, 1))
// 1. reduceByKey()操作,把所有Key相同的Value值合到一起,生成一个新的 Key-value 然后执行 func 函数,把(1,1,1)进行聚合求和,得到最终结果 ("spark", 3)
scala> val reduceWords = words.reduceByKey((a,b) => a+b)
// 打印
scala> reduceWords.foreach(printlnsdd)
行动算子
| 行动算子 | 相关说明 |
|---|---|
| count() | 返回数据集中的元素个数 |
| first() | 返回数组的第一个元素 |
| take(n) | 以数组的形式返回数组集中的前 n 个元素 |
| reduce(func) | 通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素 |
| collect() | 以数组的形式返回数据集中的所有元素 |
| foreach(func) | 将数据集中的每个元素传递到函数 func 中运行 |
RDD 机制
持久化机制
| 存储级别 | 相关说明 |
|---|---|
| MEMORY_ONLY | 默认存储级别。将RDD作为返序列化的Java对象,缓存到JVM中,若内存放不下(内存满),某些分区将不会被缓存,并且每次需要时重新计算 |
| MEMORY_AND_DISK | 将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存满),则将剩余分区存储到磁盘上,并在需要时从磁盘读取 |
| MEMORY_ONLY_SER | 将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的 Java对象节省空间,但读取时CPU占用更大 |
| MEMORY_AND_DISK_SET | 与MEMORY_ONLY_SER 类似,但是当内存放不下时则易出到磁盘,而不是每次需要时重新计算。 |
| DISK_ONLY | 仅将RDD分区全部存储到磁盘上 |
| MEMORY_ONLY_2 MEMORY_AND_DISK_2 |
与上面级别相同。若加上后缀_2,代表的是将每个持久化的数据都复制一份副本,并将副本保存到磁盘上 |
| OFF_HEAP(实验性) | 与 MEMORY_ONLY_SET 类似,但将数据存储在堆外内存中(这需要启用堆外内存) |
- 使用 persist() 方法对 RDD 进行持久化
1 | scala> import org.apache.spark.storage.StorageLevel |
容错机制
- 血统方式
根据 RDD 之间的依赖关系对丢失数据的 RDD 进行数据恢复。当子 RDD 进行窄依赖运算,只需要把丢失数据的父 RDD 的对于分区进行重新计算即可。当子 RDD 进行宽依赖计算时,需要父 RDD 的所有分区进行全局计算,这种方式会导致数据冗余计算。 - 检查点方式
将 RDD 写入磁盘进行存储,当RDD 进行宽依赖运算时,只需要在中间阶段设置一个检查点进行容错,通过 Spark 中的 sparkContext 对象调用 setCheckpoint() 方法,设置一个容错文件相同目录(HDFS)做检查点。从检查点的 RDD 开始计算,不用全局计算,减少开销。
Spark SQL 结构化数据文件处理
Spark SQL 的前身是 Shark,它运行在 Spark 系统之上,重用了 Hive 的工作机制,直接继承了 Hive 的各个组件。Spark SQL 主要有以下三个功能,支持多种数据源的查询与加载,兼容 Hive 可以使用 JDBC\ODBC 的连接方式来执行 SQL 语句。
- 可以从各种结构化数据源(JSON、Hive、Parquet等)中读取数据进行分析
- 包含行业标准的 JDBC 和 ODBC 连接方式,不局限于程序内使用。
- 可以无缝地将 SQL 查询与 Spark 程序进行结合,能够将结构化数据作为 spark 中的分布式数据集(RDD) 进行查询。
DataFrame
| 代码示例 | 描述 |
|---|---|
| spark.read.text(“people.txt”) | 读取 txt 格式文本文件,创建DataFrame |
| spark.read.csv(“people.csv”) | 读取 csv 格式文本文件,创建DataFrame |
| spark.read.json(“people.json”) | 读取 json 格式文本文件,创建DataFrame |
| spark.read.parquet(“people.parquet”) | 读取 parquet 格式文本文件,创建DataFrame |
通过文件直接创建 DataFrame
1 | scala> val personDF = spark.read.text("/spark/test/words.txt") |
RDD 转换 DataFrame
1 | scala> val lineRDD = sc.textFile("/spark/test/words.txt").map(_.split(" ")) |
DataFrame 常用操作
DataFrame 提供了两种语法风格
领域特定语言(DSL)风格操作
- show(): 查看 DataFrame 中的具体内容信息
- printSchema():查看 DataFrame 的Schema信息
- select():查看 DataFrame 中选取部分列的数据
1
2
3
4
5
6
7
8
9
10//scala> personDF.select("name").show
scala> personDF.select(personDF.col("name")).show()
// 重命名列
scala> personDF.select(personDF("name").as("username"), personDF("age")).show()
// 过滤 age 大于或等于 25 的数据
scala> personDF.filter(personDF("age") >= 25).show()
// 按年龄进行分组并统计相同年练的人数
scala> personDF.groupBy("age").count().show()
// 按年龄降序排列
scala> personDF.sort(personDF("age").desc).show()
SQL 风格操作
1
2
3
4
5
6
7
8// 将 DataFrame 注册为临时表
scala> personDF.registerTempTable("t_person")
// 查询年龄最大的两个的信息
scala> spark.sql("select * from t_person order by age desc limit 2").show()
// 查询年龄大于25岁的人的信息
scala> spark.sql("select * from t_person where age > 25").show()
Dataset 的基础知识
Dataset 结合了 RDD、DataFrame 与 Dataset 三者的优点。可以通过 SparkSession 中的 createDataset 来创建
1 | scala> val personDS = spark.createDataset(sc.textFile("/spark/person.txt")) |
RDD 转换为 DataFrame
反射机制推断 Schema
- 添加 Spark SQL 依赖 (pom.xml)
1
2
3
4
5
6<!-- 添加 Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency> - 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.msga.spark.sparkdemo.reflect
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
// 定义样例类(利用反射机制推断RDD模式时需要定义一个 case 类,定义了 Table 的结构)
case class Person(id:Int, name:String, age:Int)
object CaseClassSchema {
def main(args: Array[String]): Unit = {
//1. 构建 SparkSession
val spark : SparkSession = SparkSession.builder()
.appName("CaseClassSchema")
.master("local[2]")
.getOrCreate()
// 2. 获取 SparkContext
val sc : SparkContext = spark.sparkContext
// 设置日志级别
sc.setLogLevel("WARN")
//3. 读取文件
val data: RDD[Array[String]] = sc.textFile("D:\\Java\\ideaProjects\\sparkdemo\\src\\main\\resources\\person.txt").map(x => x.split(" "))
//4. 将 RDD 与样例关联
val personRdd: RDD[Person] = data.map(x => Person(x(0).toInt, x(1), x(2).toInt))
//5. 获取DataFrame
//手动导入隐式转换
import spark.implicits._
val personDF: DataFrame = personRdd.toDF
// ------------ DSL 风格操作 ------------
//1. 显示 DataFrame 的数据,默认显示 20 行
personDF.show()
//2. 显示 DataFrame 的 schema 信息
personDF.printSchema()
//3. 统计 DataFrame 中年龄大于 30 岁的人数
println(personDF.filter($"age" > 30).count())
// ----------- DSL 风格操作结束 --------
// ----------- SQL 风格操作 -------------
//1. 将 DataFrame 注册成表
personDF.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show()
spark.sql("select * from t_person where name='zhangsan'").show()
// ------------SQL 风格操作结束 ---------
// 关闭资源操作
sc.stop()
spark.stop()
}
}
编程方式定义 Schema
当 case 类不能提前定义时,需要常用编程方式定义 Schema 信息
步骤
- 创建一个 Row 对象结构的 RDD
- 基于 StructType 类型创建 Schema
- 通过SparkSession 提供的 createDataFrame() 方法来拼接 Schema
1 | package com.msga.spark.sparkdemo.code |
Spark SQL 操作数据源
操作MySQL
操作 MySQL
1 | mysql -uroot -p |
编写 pom.xml
1 | <!-- MySQL 数据库连接 --> |
scala 程序操作
1 | package com.msga.spark.sparkdemo.mysqldemo |
写入数据 MySQL
1 | package com.msga.spark.sparkdemo.mysqldemo |
操作 Hive 数据集
Hive 是 Hadoop 上的 SQL 引擎,也是大数据中重要的数据仓库工具。
- 准备环境
1
2
3
4
5
6# Hive 常用 MySQL 数据库存放 Hive 元数据, 需要将 MySQL 启动复制到 Spark 下的 jars 目录下
cp mysql-connector-java-8.0.13.jar /export/servers/spark/jars/
# 配置 hive-site.xml 配置文件复制到 Spark 配置文件中。
ln -s /export/servers/apache-hive-xxx-bin/conf/hive-site.xml \
/export/servers/spark/conf/hive-site.xml
hive-site.xml
1 | <property> |
- 在 Hive 中创建数据库和表
首先在 hadoop01 节点上启动 Hive 服务,创建数据库和表
1 | # 启动 Hive 程序 |
- Spark SQL 操作 Hive 数据库
执行 Spark-shell ,进入 sparksqltest 数据仓库,查看 person 表是否存在
1 | spark-shell --master spark://hadoop01:7077 |
- 向 Hive 表写入数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27scala> spark.sql("select * from person").show
scala> import java.util.Properties
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> val personRDD = spark.sparkContext.parallelize(Array("3 zhangsan 22", "4 lisi 29")).map(_.split(" "))
// 设置 Schema
scala> val schema: StructType = StructType(List(
StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 创建 Row 对象
scala> val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
// 建立 rowRDD 与 Schema 对应关系,创建 DataFrame
scala> val personDF = spark.createDataFrame(rowRDD, schema)
// 注册临时表
scala> personDF.registerTempTable("t_person")
// 将数据插入 Hive 表
scala> spark.sql("insert into person select * from t_person")
// 查询表数据
scala> spark.sql("select * from person").show
HBase
HBase的特性
- 容量大
HBase 分布式数据库中的表可以存储成千上万的行和列组成的数据 - 面向列
Hbase 面向列存储和权限控制,支持独立检索。按照列存储根据数据动态增加列,并且可以单独对列进行各种操作。 - 多版本
表中的每一个列的数据存储都会有多个版本(version),一般每列对应一条数据,但是有些数据会有多个版本,例如个人的历史修改信息。 - 稀疏性
表允许为空,空列不会占用存储空间,因此表可以设计的非常稀疏 - 扩展性
底层依赖于 HDFS。当磁盘空间不足是,可以动态增加机器节点(DataNode)来增加磁盘空间,从而避免关系型数据库一样需要数据迁移。 - 高可靠性
底层使用的是 HDFS, HDFS 具有备份机制, Spark 出现问题时,Replication(即副本)机制能够保证数据不会发生丢失或损坏。
HBase 与传统数据库的区别
- 存储模式。传统数据库基于行存储,而HBase是基于列存储
- 表字段。传统数据库表字段不能超过 30 个,而 HBase 不设限制
- 可延伸性。传统数据库存储的列是固定的,HBase可以去动态地增加列,列是不固定的
HBase 的数据模型
Row Key(行键)
每个 HBase 表中只能有一个行键,它在 HBase 中以字典序的方式存储。数据存储的规则是相近的数据存储到一起。例如www.apache.org、mail.apache.org,可以将其反转为org.apache.www、org.apache.mail, 然后进行存储,这样会把所有的 org.apache 域名存储在一起,避免子域名分散在各处.Timestamp(时间戳)
记录每次操作数据的时间,通常作为数据的版本号。Column(列)
HBase 表是有列族名、限定符以及列名组成的,其中“:”为限定符。创建 HBase 表不需要指定列,因为列是可变的,非常灵活。ColumnFamily(列族)
在 HBase 中,列族由许多列组成。同一个表里,不同列族由完全不同的属性,但是一个列族内的所有列都会有系统的属性,而属性都是定义在列族上的。c1、c2、c3 均为列族名。
| Row Key | Timestamp | Column Family:c1 | Column Family:c2 | Column Family:c3 | |||
| Column | value | Column | value | Column | value | ||
| r1 | t7 | c1:col-1 | value-1 | c3:col-1 | value-1 | ||
| t6 | c1:col-2 | value-2 | c3:col-2 | value-2 | |||
| t5 | c1:col-3 | value-3 | |||||
| t4 | |||||||
| r2 | t3 | c1:col-1 | value-1 | c2:col-1 | value-1 | c3:col-1 | value-1 |
| t2 | c1:col-2 | value-2 | |||||
| t1 | c1:col-3 | value-3 | |||||
HBase 的集群部署
HBase 通过使用 Zookeeper 来实现高可用集群。
步骤
到官网下载HBase
解压到 /export/servers/ 下
将 ${hadoop_path}/etc/hadoop 目录下的 hdfs-site.xml 和 core-site.xml 配置文件复制到 HBase 的 conf 目录下
1
cp hadoop-2.10.1/etc/hadoop/{hdfs-site.xml,core-site.xml} /export/servers/hbase-2.4.2/conf/
进入 ${hbase_path}/conf 目录修改 hbase-env.sh 配置文件, 指定 JDK 环境变量并配置 Zookeeper.
1
2export JAVA_HOME=/export/servers/jdk
export HBASE_MANAGES_ZK=false配置 hbase-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23<!-- 指定Zookeeper地址,用.,.分隔 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
<!-- 指定HBase在HDFS的存储路径 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop01:9000/hbase</value>
</property>
<!-- 指定HBase是分布式的 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>./tmp</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>修改 regionservers
1
2hadoop02
hadoop03修改 backup-masters 配置文件,当中单点故障配置备用 主节点
1
hadoop02
修改 profile 文件
1
2
3
4
5
6
7vi /etc/profile
# 添加到 profile文件里
# HBase
export HBASE_HOME=/export/servers/hbase-2.4.2
# 配置zookeeper使用外部zookeeper(默认内部zookeeper)
export PATH=$PATH:$HBASE_HOME/bin:将安装目录 分发到其他服务器
1
2
3# 分发到另外两台电脑
scp -r /export/servers/hbase-2.4.2/ hadoop02:/export/servers/
scp -r /export/servers/hbase-2.4.2/ hadoop03:/export/servers/启动HBase集群
启动zookeeper 和 HDFS
1
2
3
4# 所有节点运行
zkServer.sh start
# 在主节点运行
start-dfs.sh启动HBase集群
启动HBase保证时间同步,否则报错 ClockOutOfSynException, 用data查看, 用__ntpdate –u cn.pool.ntp.org__命令同步时间.
启动HBase集群: start-hbase.sh
浏览器输入: hadoop01:16010
HBase 的基本操作
| 命令名称 | 相关说明 | 命令名称 | 相关说明 |
|---|---|---|---|
| create | 创建表 | count | 统计表中数据的行数 |
| put | 插入或更新数据 | delete | 删除指定行或列的数据 |
| scan | 扫描表并返回表的数据 | deleteall | 删除整个行或列的数据 |
| describe | 查看表的结构 | truncate | 删除整个表中的数据,不删除表结构 |
| get | 获取指定行或列的数据 | drop | 删除整个表,数据和结构都删除 |
HBase Shell 操作
1 | hbase-2.4.2/bin/hbase shell |
HBase 的 Java API 操作
常见的 Java API操作
| 类或接口名称 | 相关说明 |
|---|---|
| Admin | 类,建立客户端和 HBase 数据库的连接 |
| HBaseConfiguration | 类,将 HBase 配置添加到配置文件中 |
| HTableDescriptor | 接口,用于描述表的信息 |
| HColumnDescriptor | 类,用于描述列族的信息 |
| Table | 接口,实现 HBase 表的通信 |
| Put | 类,插入数据操作 |
| Get | 类,查询单条记录 |
| Delete | 类,删除数据 |
| Scan | 类,查询所有记录 |
| Result | 类,查询返回单条记录结果 |
- 导入 Maven 依赖
1 | <!-- ++++++++++++++++ HBase 学习 ++++++++++++++++ --> |
- Java 代码
- 连接集群,创建 HBase 表
1 | import org.apache.hadoop.conf.Configuration; |
- 插入数据
1 |
|
查看指定字段的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void testGet() throws Exception{
// 获取一个 table 对象
Table table = conn.getTable(TableName.valueOf("t_user_info"));
// 创建 get 查询承诺书对象,指定要获取的是哪一行
Get get = new Get("user001".getBytes());
// 返回查询结果的数据
Result result = table.get(get);
// 获取结果所有的 cell
List<Cell> cells = result.listCells();
// 遍历所有的 cell
for(Cell c : cells) {
// 获取行键
System.out.println("行:" + Bytes.toString(CellUtil.cloneRow(c)));
// 得到列族
System.out.println("列族:" + Bytes.toString(CellUtil.cloneFamily(c)));
System.out.println(":" + Bytes.toString(CellUtil.cloneQualifier(c)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(c)));
}
// 关闭
table.close();
conn.close();
}- 扫描数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void testScan() throws Exception {
// 获取 table 对象
Table table = conn.getTable(TableName.valueOf("t_user_info"));
// 创建 Scan 对象
Scan scan = new Scan();
// 获取查询结果
ResultScanner rs = table.getScanner(scan);
// 获取查询的数据
Iterator<Result> iterator = rs.iterator();
while (iterator.hasNext()){
// 获取当前每一行数据
Result result = iterator.next();
// 获取当前每一行的 cell 对象
List<Cell> cells = result.listCells();
for (Cell cell : cells){
// 获取行键
byte[] rowArray = cell.getRowArray();
byte[] familyArray = cell.getFamilyArray();
byte[] qualifierArray = cell.getQualifierArray();
byte[] valueArray = cell.getValueArray();
// 获取行键
System.out.println("行:" + new String(rowArray, cell.getRowOffset(), cell.getRowLength()));
System.out.println("列族::" + new String(familyArray, cell.getRowOffset(), cell.getRowLength()));
System.out.println(":" + new String(qualifierArray, cell.getRowOffset(), cell.getRowLength()));
System.out.println("值:" + new String(valueArray, cell.getRowOffset(), cell.getRowLength()));
}
System.out.println("--------------------------");
}
table.close();
conn.close();
}- 删除表
1
2
3
4
5
6
7
8
9
10
11
public void testDrop() throws Exception {
// 获取一个表的管理器
Admin admin = conn.getAdmin();
// 删除表是先需要 disable
admin.disableTable(TableName.valueOf("t_user_info"));
admin.deleteTable(TableName.valueOf("t_user_info"));
admin.close();
conn.close();
}HBase 原理
架构
HBase 构建在 HDFS 之上, HDFS 为此提供了高可用的底层存储支持, Hadoop MapReduce 为此提供了高性能的计算能力,Zookeeper 为 HBase 提供了稳定的服务和容错机制。
- Client: 通过 RPC 协议与 HBase 通信
- Zookeeper: 分布式协调服务,用作监控 HRegionServer 状态,将 HRegionServer 上线情况实时通知给 HMaster,确保集群只有一个 HMaster 在工作
- HMaster: HBase 的主节点,协调、监控 HRegionServer 以及 平衡 HRegionServer 之间的负载。 HMaster 还负责为 HRegionServer 分配 HRegion
- HRegionServer: HBase 的从节点,包括了多个 HRegion,用于响应用户 I/O 请求,向 HDFS 写入数据
- HRegion:HBase 表的分片,每个分片报错的是表中某段连续的数据。
- Store:HRegion 包含一个或多个 Store,每个 Store 都用于管理一个 Region 上的一个列族
- MemStore: 内存级缓存,MemStore 存放在 Store 中,用于保存修改的数据(K-V 形式)当存储数据达到一个阀值(默认 128M)时,数据就会被 flush 操作,将数据写入 StoreFile 文件,flush 操作由专门的线程负责。
- StoreFile: MemStore 数据写入到文件后存储的就是 StoreFile,StoreFile 底层是以 HFile 的格式保存在 HDFS 上。
- HFile: HBase 中键值对类型的数据均以 HFile 文件格式进行存储
- HLog:预写日志文件,负责记录 HBase 的修改。当 HBase 读写数据是,数据不是直接写进磁盘,而是在内存中保留一段时间。如果数据写入预写日志文件中,然后再写入内存中,一旦系统故障,可以通过日志文件恢复数据。
物理存储
- 按照行键 Row Key 的字典序进行排列,切分多个 HRegion 存储。
- 每个 Region 存储的数据是有限的,达到一个阀值(128MB)是会被等切分成两个新的 Region 。
- 一个 HRegionServer上可以存储多个 Region 但是每个 Region只能分布到一个 HRegionServer 上
- MemStore 中存储的是用户写入的数据,达到阀值时,会以 HFile 存储到 HDFS 上。
寻址机制
Zookeeper 存储的是 ROOT 表的数据,而 ROOT 表存储的是 META 表的 Regin 信息,也就是所有 RegionServer 的地址。
- Client 访问 Zookeeper 请求行键 RK001 数据所在的 RegionServer 地址
- Zookeeper 从 ROOT 表中查询所有表的 META 信息
- META 表将具体存储行键RK001 数据的 ReginServer 地址返回给 Client,相当于 Client从 Zookeeper中 META 表中查询到 ReginServer 的地址。
- Client 获取到地址后,直接向RegionServer 发送查询行键为 RK001 的这条数据的请求,RegionServer 收到请求就会查询行键 RK001 的 Regino
- RegionServer 将行键为 RK001 的数据返回给 Client
HBase 和 Hive 整合
HBase 不支持 SQL 语法,因此 操作和计算 HBase 分布式数据库时非常不方便,效率也偏低。通过 Hive 数据仓库操作 HBase 分布式数据库中的数据,可以满足业务需求。
环境搭建 vi /etc/profile
1
2
3
4
5export HBASE_HOME=/export/servers/hbase-2.4.2
export PATH=$PATH:$HBASE_HOME/bin:
export HIVE_HOME=/export/servers/hive-3.1.2
export PATH=$PATH:$HIVE_HOME/bin:导入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15cd /export/servers/hbase-2.4.2
# HBase 基本包
cp -r lib/hbase-common-2.4.2.jar \
# HBase 服务端
lib/hbase-server-2.4.2.jar \
# HBase 客户端
lib/hbase-client-2.4.2.jar \
# HBase 通信
lib/hbase-protocol-2.4.2.jar \
# 整合其他框架做测试
lib/hbase-it-2.4.2 \
# 兼容 Hadoop 其他版本
lib/hbase-hadoop2-compat-2.4.2.jar \
lib/hbase-hadoop-compat-2.4.2.jar \
/export/servers/hive-3.1.2/lib/修改配置文件 __vi hive-3.1.2/conf/hive-site.xml __
1 | <!-- 指定 zookeeper 集群地址 --> |
- 启动相关服务(Zookeeper Hadoop MySQL Hive HBase)
1 | # 启动 zookeeper 集群,脚本 |
创建表
1
2
3
4
5
6
7
8
9
10
11
12CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");创建 hive 临时中间表
1
2
3
4
5
6
7
8
9
10
11
12
13CREATE TABLE emp (
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
# 查看
show tables;创建文件 vi /export/data/emp.txt
1
2
3
4
5
6
7
8
9
10
117369 SMITH CLERK 7902 1980-12-11 800.0 20
7379 LIWSI CLERK 7902 1980-12-11 800.0 20
7383 ZHANGSAN SALESMAN 7602 1980-12-11 1100.0 20
7384 QQXIU SALEMAN 7602 1988-12-15 1000.0 30
7386 FORTHEW MANGER 1202 1980-02-20 20000.0 20
7409 ABCDE MANGER 1202 1976-03-03 20000.0 20
7419 LIBIBI CLERK 7902 1945-04-23 800.0 20
7452 BILIBIL ANALYST 7782 1923-11-26 2000.0 20
7529 BEIKLI CLERK 7902 1998-11-12 800.0 20
7534 HEKLILK CLERK 7902 1967-08-15 800.0 20
7603 MIIIUK CLERK 7902 1980-02-15 800.0 20插入数据
1
2
3
4
5
6
7
8
9# 插入到临时表
load data inpath '/export/data/emp.txt' into table emp;
# 临时表导入到 hive_hbase_emp_table 表
insert into table hive_hbase_emp_table select * from emp;
# 查看整合情况
hive> select * from hive_hbase_emp_table;
hbase> scan 'hbase_emp_table'
- 本文作者: MISAKIGA
- 本文链接: https://misakiga.github.io/2021/07/18/big-data/Spark/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!
