导航:首页 > 网络数据 > hive去重distinct大数据

hive去重distinct大数据

发布时间:2022-09-17 05:43:00

Ⅰ Hive优化

(二)数据倾斜的解决方案
1、参数调节
hive.map.aggr=true
Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true
有数据倾斜的时候 进行负载均衡 ,当选项设定为true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Rece 中,每个 Rece 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Rece 中 ,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Rece 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Rece 中),最后完成最终的聚合操作。

2、SQL语句调节
Join:
关于驱动表的选取:选用join key分布最均匀的表作为驱动表

做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。

大表与小表Join
使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成rece。

大表与大表Join:
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的rece上,由于null值关联不上,处理后并不影响最终结果。

count distinct大量相同特殊值:
count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。

group by维度过小:
采用sum() 与group by的方式来替换count(distinct)完成计算。

特殊情况特殊处理:
在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。

3、应用场景
(一)空值产生的数据倾斜
场景:
如日志中,常会有信息丢失的问题,比如日志中的user_id,如果取其中的user_id和用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1 : user_id为空的不参与关联

解决方法2 :赋与空值新的key值

结论:
方法2比方法1效率更好,不但io少了,而且作业数也少了。
解决方法1中 log读取两次,job是2。
解决方法2中 job数是1 。这个优化适合无效 id (比如 -99 , ”, null 等) 产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的rece上,解决数据倾斜问题。

(二)不同数据类型关联产生数据倾斜
场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Recer中。

解决方法 :把数字类型转换成字符串类型

(三)小表不小不大,怎么用 map join 解决倾斜问题
使用 map join 解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。
解决如下:

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决方法:

假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。

Ⅱ HIVE 列表型的str中的元素去重

只是去重的话可以用collect_set,,,如果还需要保持有序可能就需要用collect_list了

Ⅲ Hive常用算子实现原理简述--MapRece版

Hive中的常用算子包括distinct、join、group by、order by、distribute by、sort by、count等,这些操作符在SQL中使用起来很方便,能快速达到我们想要的效果,但是这些算子在底层是怎么实现的呢?

order by很容易想到执行原理,在一个rece中将所有记录按值排序即可。因此order by在数据量大的情况下执行时间非常长,容易out of memory,非特殊业务需求一般不使用。distribute by也比较明显,根据hash值将distribute的值分发到不同的rece。sort by是小号的order by,只负责将本recer中的值排序,达到局部有序的效果。sort by和distribute by配合使用风味更佳,二者可以合并简写为cluster by。count则更加明晰,在combiner或recer处按相同键累加值就能得到。

比较复杂的是distinct、join、group by,本文重点讨论这三个算子在MapRece引擎中的大致实现原理。班门弄斧,抛砖引玉。

map阶段,将group by后的字段组合作为key,如果group by单字段那么key就一个。将group by之后要进行的聚合操作字段作为值,如要进行count,则value是1;如要sum另一个字段,则value就是该字段。

shuffle阶段,按照key的不同分发到不同的recer。注意此时可能因为key分布不均匀而出现数据倾斜的问题。

rece阶段,将相同key的值累加或作其他需要的聚合操作,得到结果。

对group by的过程讲解的比较清楚的是这篇文章 http://www.mamicode.com/info-detail-2292193.html 图文并茂,很生动。

实例如下图,对应语句是 select rank, isonline, count(*) from city group by rank, isonline;

如果group by出现数据倾斜,除去替换key为随机数、提前挑出大数量级key值等通用调优方法,适用于group by的特殊方法有以下几种:

(1)set hive.map.aggr=true,即开启map端的combiner,减少传到recer的数据量,同时需设置参数hive.groupby.mapaggr.checkinterval 规定在 map 端进行聚合操作的条目数目。

(2)设置mapred.rece.tasks为较大数量,降低每个recer处理的数据量。

(3)set hive.groupby.skewindata=true,该参数可自动进行负载均衡。生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Rece 中,每个 Rece 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Rece 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Rece 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Rece中),最后完成最终的聚合操作。

Hive中有两种join方式:map join和common join

如果不显式指定map side join,或者没有达到触发自动map join的条件,那么会进行rece端的join,即common join,这种join包含map、shuffle、rece三个步骤。

(1)Map阶段

读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key。Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表。然后按照key进行排序。

(2)Shuffle阶段

根据key的值进行hash,并将key/value按照hash值推送至不同的rece中,这样确保两个表中相同的key位于同一个rece中

(3)Rece阶段

根据key的值完成join操作,期间通过Tag来识别不同表中的数据。

以下面的SQL为例,可用下图所示过程大致表达其join原理。

SELECT u.name, o.orderid FROM user u JOIN order o ON u.uid = o.uid;

关联字段是uid,因此以uid为map阶段的输出key,value为选取的字段name和标记源表的tag。shuffle阶段将相同key的键值对发到一起,rece阶段将不同源表、同一key值的记录拼接起来,可能存在一对多的情况。

如果指定使用map join的方式,或者join的其中一张表小于某个体积(默认25MB),则会使用map join来执行。具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定。

Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数 hive.auto.convert.join 来控制,默认为true。

以下图为例说明map join如何执行,该图来自 http://lxw1234.com/archives/2015/06/313.htm ,博主是一个水平深厚又乐于分享的前辈,图片水印上也有其网址。

yarn会启动一个Local Task(在客户端本地执行的Task)--Task A,负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。

接下来是Task B,该任务是一个没有Rece的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

由于MapJoin没有Rece,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。

distinct一般和group by同时出现。

当distinct一个字段时,将group by的字段和distinct的字段组合在一起作为map输出的key,value设置为1,同时将group by的字段定为分区键,这可以确保相同group by字段的记录都分到同一个recer,并且map的输入天然就是按照组合key排好序的。根据分区键将记录分发到rece端后,按顺序取出组合键中的distinct字段,这时distinct字段也是排好序的。依次遍历distinct字段,每找到一个不同值,计数器就自增1,即可得到count distinct结果。例如下面的SQL语句,过程可以下图示意。

我暂时没有理解这是怎么实现的,别人写的也没有看明白。有善良的学富五车的大佬指点一下吗?

Ⅳ Hive sql及窗口函数

hive函数:

1、根据指定条件返回结果:case when then else end as

2、基本类型转换:CAST()

3、nvl:处理空字段:三个str时,是否为空可以指定返回不同的值

4、sql通配符: https://www.w3school.com.cn/sql/sql_wildcards.asp

5、count(1)与COUNT(*):返回行数

如果表没有主键,那么count(1)比count(*)快;

如果有主键,那么count(主键,联合主键)比count(*)快;

count(1)跟count(主键)一样,只扫描主键。count(*)跟count(非主键)一样,扫描整个表。明显前者更快一些。

性能问题:

1.任何情况下SELECT COUNT(*) FROM tablename是最优选择,(指没有where的情况);

2.尽量减少SELECT COUNT(*) FROM tablename WHERE COL = ‘value’ 这种查询;

3.杜绝SELECT COUNT(COL) FROM tablename WHERE COL2 = ‘value’ 的出现。

count(expression):查询 is_reply=0 的数量: SELECT COUNT(IF(is_reply=0,1,NULL)) count FROM t_iov_help_feedback;

6、distinct与group by

distinct去重所有distinct之后所有的字段,如果有一个字段值不一致就不作为一条

group by是根据某一字段分组,然后查询出该条数据的所需字段,可以搭配 where max(time)或者Row_Number函数使用,求出最大的一条数据

7、使用with 临时表名 as() 的形式,简单的临时表直接嵌套进sql中,复杂的和需要复用的表写到临时表中,关联的时候先找到关联字段,过滤条件最好在临时表中先过滤后关联

处理json的函数:

split(json_array_string(schools), '\\|\\|') AS schools

get_json_object(school, '$.id') AS school_id,

字符串函数:

1、instr(’源字符串’ , ‘目标字符串’ ,’开始位置’,’第几次出现’)

instr(sourceString,destString,start,appearPosition)

1.sourceString代表源字符串; destString代表要从源字符串中查找的子串;

2.start代表查找的开始位置,这个参数可选的,默认为1;

3.appearPosition代表想从源字符中查找出第几次出现的destString,这个参数也是可选的, 默认为1

4.如果start的值为负数,则代表从右往左进行查找,但是位置数据仍然从左向右计算。

5.返回值为:查找到的字符串的位置。如果没有查找到,返回0。

最简单例子: 在abcd中查找a的位置,从第一个字母开始查,查找第一次出现时的位置

select instr(‘abcd’,’a’,1,1) from al; —1

应用于模糊查询:instr(字段名/列名, ‘查找字段’)

select code,name,dept,occupation from staff where instr(code, ‘001’)> 0;

等同于 select code, name, dept, occupation from staff where code like ‘%001%’ ;

应用于判断包含关系:

select ccn,mas_loc from mas_loc where instr(‘FH,FHH,FHM’,ccn)>0;

等同于 select ccn,mas_loc from mas_loc where ccn in (‘FH’,’FHH’,’FHM’);

2、substr(string A,int start,int len)和 substring(string A,int start,int len),用法一样

substr(time,1,8) 表示将time从第1位开始截取,截取的长度为8位

第一种用法:

substr(string A,int start)和 substring(string A,int start),用法一样

功效:返回字符串A从下标start位置到结尾的字符串

第二种用法:

substr(string A,int start,int len)和 substring(string A,int start,int len),用法一样

功效:返回字符串A从下标start位置开始,长度为len的字符串

3、get_json_object(form_data,'$.学生姓名') as student_name

json_tuple 函数的作用:用来解析json字符串中的多个字段

4、split(full_name, '\\.') [5] AS zq;  取的是数组里的第六个

日期(时间)函数:

1、to_date(event_time) 返回日期部分

2、date_sub:返回当前日期的相对时间

当前日期:select curdate() 

当前日期前一天:select  date_sub(curdate(),interval 1 day)

当前日期后一天:select date_sub(curdate(),interval -1 day)

date_sub(from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss'), 14)  将现在的时间总秒数转为标准格式时间,返回14天之前的时间

时间戳>>>>日期:

from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') 将现在的时间总秒数转为标准格式时间

from_unixtime(get_json_object(get_json_object(form_data,'$.挽单时间'),'$.$date')/1000) as retain_time

unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss')  --1565858400

日期>>>>时间戳:unix_timestamp()

date_format:yyyy-MM-dd HH:mm:ss 时间转格式化时间

select date_format('2019-10-07 13:24:20', 'yyyyMMdd000000')-- 20191007000000select date_format('2019-10-07', 'yyyyMMdd000000')-- 20191007000000

1.日期比较函数: datediff语法: datediff(string enddate,string startdate) 

返回值: int 

说明: 返回结束日期减去开始日期的天数。 

举例:  hive> select datediff('2016-12-30','2016-12-29');  1

2.日期增加函数: date_add语法: date_add(string startdate, intdays) 

返回值: string 

说明: 返回开始日期startdate增加days天后的日期。 

举例:  hive>select date_add('2016-12-29',10);  2017-01-08

3.日期减少函数: date_sub语法: date_sub (string startdate,int days) 

返回值: string 

说明: 返回开始日期startdate减少days天后的日期。 

举例:  hive>select date_sub('2016-12-29',10);  2016-12-19

4.查询近30天的数据

select * from table where datediff(current_timestamp,create_time)<=30;

create_time 为table里的字段,current_timestamp 返回当前时间 2018-06-01 11:00:00

3、trunc()函数的用法:当前日期的各种第一天,或者对数字进行不四舍五入的截取

日期:

1.select trunc(sysdate) from al  --2011-3-18  今天的日期为2011-3-18

2.select trunc(sysdate, 'mm')   from   al  --2011-3-1    返回当月第一天.

上月1号    trunc(add_months(current_date(),-1),'MM')

3.select trunc(sysdate,'yy') from al  --2011-1-1       返回当年第一天

4.select trunc(sysdate,'dd') from al  --2011-3-18    返回当前年月日

5.select trunc(sysdate,'yyyy') from al  --2011-1-1   返回当年第一天

6.select trunc(sysdate,'d') from al  --2011-3-13 (星期天)返回当前星期的第一天

7.select trunc(sysdate, 'hh') from al   --2011-3-18 14:00:00   当前时间为14:41  

8.select trunc(sysdate, 'mi') from al  --2011-3-18 14:41:00   TRUNC()函数没有秒的精确

数字:TRUNC(number,num_digits) Number 需要截尾取整的数字。Num_digits 的默认值为 0。TRUNC()函数截取时不进行四舍五入

11.select trunc(123.458,1) from al --123.4

12.select trunc(123.458,-1) from al --120

4、round():四舍五入:

select round(1.455, 2)  #结果是:1.46,即四舍五入到十分位,也就是保留两位小数

select round(1.5)  #默认四舍五入到个位,结果是:2

select round(255, -1)  #结果是:260,即四舍五入到十位,此时个位是5会进位

floor():地板数

ceil()天花板数

5、

6.日期转年函数: year语法:   year(string date) 

返回值: int

说明: 返回日期中的年。

举例:

hive>   select year('2011-12-08 10:03:01') from al;

2011

hive>   select year('2012-12-08') fromal;

2012

7.日期转月函数: month语法: month   (string date) 

返回值: int

说明: 返回日期中的月份。

举例:

hive>   select month('2011-12-08 10:03:01') from al;

12

hive>   select month('2011-08-08') fromal;

8

8.日期转天函数: day语法: day   (string date) 

返回值: int

说明: 返回日期中的天。

举例:

hive>   select day('2011-12-08 10:03:01') from al;

8

hive>   select day('2011-12-24') fromal;

24

9.日期转小时函数: hour语法: hour   (string date) 

返回值: int

说明: 返回日期中的小时。

举例:

hive>   select hour('2011-12-08 10:03:01') from al;

10

10.日期转分钟函数: minute语法: minute   (string date) 

返回值: int

说明: 返回日期中的分钟。

举例:

hive>   select minute('2011-12-08 10:03:01') from al;

3

11.日期转秒函数: second语法: second   (string date) 

返回值: int

说明: 返回日期中的秒。

举例:

hive>   select second('2011-12-08 10:03:01') from al;

1

12.日期转周函数: weekofyear语法:   weekofyear (string date) 

返回值: int

说明: 返回日期在当前的周数。

举例:

hive>   select weekofyear('2011-12-08 10:03:01') from al;

49

查看hive表在hdfs中的位置:show create table 表名;

在hive中hive2hive,hive2hdfs:

HDFS、本地、hive -----> Hive:使用 insert into | overwrite、loaddata local inpath "" into table student;

Hive ----> Hdfs、本地:使用:insert overwrite | local

网站访问量统计:

uv:每用户访问次数

ip:每ip(可能很多人)访问次数

PV:是指页面的浏览次数

VV:是指你访问网站的次数

sql:

基本函数:

count、max、min、sum、avg、like、rlike('2%'、'_2%'、%2%'、'[2]')(java正则)

and、or、not、in   

where、group by、having、{ join on 、full join}  、order by(desc降序)

sort by需要与distribut by集合结合使用:

hive (default)> set maprece.job.reces=3;  //先设置rece的数量 

insert overwrite local directory '/opt/mole/datas/distribute-by'

row format delimited fields terminated by '\t'

先按照部门编号分区,再按照员工编号降序排序。

select * from emp distribute by deptno sort by empno desc;

外部表  create external table if not exists dept

分区表:create table dept_partition ( deptno int, dname string, loc string )  partitioned by ( month string )

load data local inpath '/opt/mole/datas/dept.txt' into table default.dept_partition partition(month='201809'); 

 alter table dept_partition add/drop partition(month='201805') ,partition(month='201804');

多分区联合查询:union

select * from dept_partition2 where month='201809' and day='10';

show partitions dept_partition;

desc formatted dept_partition;

二级分区表:create table dept_partition2 ( deptno int, dname string, loc string ) partitioned by (month string, day string) row format delimited fields terminated by '\t';

分桶抽样查询:分区针对的是数据的存储路径;分桶针对的是数据文件

create table stu_buck(id int, name string) clustered by(id) into 4 bucketsrow format delimited fields terminated by '\t';

设置开启分桶与rece为1:

set hive.enforce.bucketing=true;

set maprece.job.reces=-1;

分桶抽样:select * from stu_bucktablesample(bucket x out of y on id);

抽取,桶数/y,x是从哪个桶开始抽取,y越大 抽样数越少,y与抽样数成反比,x必须小于y

给空字段赋值:

如果员工的comm为NULL,则用-1代替或用其他字段代替  :select nvl(comm,-1) from emp;

case when:如何符合记为1,用于统计、分组统计

select dept_id, sum(case sex when '男' then 1 else 0 end) man , sum(case sex when '女' then 1 else 0 end) woman from emp_sex group by dept_id;

用于组合归类汇总(行转列):UDAF:多转一

concat:拼接查询结果

collect_set(col):去重汇总,产生array类型字段,类似于distinct

select t.base, concat_ws('|',collect_set(t.name))   from (select concat_ws(',',xingzuo,blood_type) base,name  from person_info) t group by t.base;

解释:先第一次查询得到一张没有按照(星座血型)分组的表,然后分组,使用collect_set将名字组合成数组,然后使用concat将数组变成字符串

用于拆分数据:(列转行):UDTF:一转多

explode(col):将hive一列中复杂的array或者map结构拆分成多行。

lateral view  侧面显示:用于和UDTF一对多函数搭配使用

用法:lateral view udtf(expression) tablealias as cate

cate:炸开之后的列别名

temptable :临时表表名

解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。

开窗函数:

Row_Number,Rank,Dense_Rank  over:针对统计查询使用

Row_Number:返回从1开始的序列

Rank:生成分组中的排名序号,会在名词s中留下空位。3 3 5

dense_rank:生成分组中的排名序号,不会在名词中留下空位。3 3 4

over:主要是分组排序,搭配窗口函数使用

结果:

SUM、AVG、MIN、MAX、count

preceding:往前

following:往后

current row:当前行

unbounded:unbounded preceding 从前面的起点, unbounded following:到后面的终点

sum:直接使用sum是总的求和,结合over使用可统计至每一行的结果、总的结果、当前行+之前多少行/之后多少行、当前行到往后所有行的求和。

over(rowsbetween 3/current )  当前行到往后所有行的求和

ntile:分片,结合over使用,可以给数据分片,返回分片号

使用场景:统计出排名前百分之或n分之一的数据。

lead,lag,FIRST_VALUE,LAST_VALUE

lag与lead函数可以返回上下行的数据

lead(col,n,dafault) 用于统计窗口内往下第n行值

第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL)

LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值

第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL)

使用场景:通常用于统计某用户在某个网页上的停留时间

FIRST_VALUE:取分组内排序后,截止到当前行,第一个值

LAST_VALUE:取分组内排序后,截止到当前行,最后一个值

范围内求和: https://blog.csdn.net/happyrocking/article/details/105369558

cume_dist,percent_rank

–CUME_DIST :小于等于当前值的 行数 / 分组内总行数

–比如,统计小于等于当前薪水的人数,占总人数的比例

percent_rank:分组内当前行的RANK值-1/分组内总行数-1

总结:

在Spark中使用spark sql与hql一致,也可以直接使用sparkAPI实现。

HiveSql窗口函数主要应用于求TopN,分组排序TopN、TopN求和,前多少名前百分之几。

与Flink窗口函数不同。

Flink中的窗口是用于将无线数据流切分为有限块处理的手段。

window分类:

CountWindow:按照指定的数据条数生成一个 Window,与时间无关。

TimeWindow:按照时间生成 Window。

1. 滚动窗口(Tumbling Windows):时间对齐,窗口长度固定,不重叠::常用于时间段内的聚合计算

2.滑动窗口(Sliding Windows):时间对齐,窗口长度固定,可以有重叠::适用于一段时间内的统计(某接口最近 5min 的失败率来报警)

3. 会话窗口(Session Windows)无时间对齐,无长度,不重叠::设置session间隔,超过时间间隔则窗口关闭。

Ⅳ 数据分析课程笔记 - 19 - HiveSQL 常用优化技巧

大家好呀,这节课学习 HiveSQL 的常用优化技巧。由于 Hive 主要用来处理非常大的数据,运行过程由于通常要经过 MapRece 的过程,因此不像 MySQL 一样很快出结果。而使用不同方法写出来的 HiveSQL 语句执行效率也是不一样的,因此为了减少等待的时间,提高服务器的运行效率,我们需要在 HiveSQL 的语句上进行一些优化。

本节课的主要内容

引言
1、技巧一:列裁剪和分区裁剪
(1)列裁剪
(2)分区裁剪
2、技巧二:排序技巧——sort by代替order by
3、技巧三:去重技巧——用group by来替换distinct
4、技巧四:聚合技巧——grouping sets、cube、rollup
(1)grouping sets
(2)cube
(3)rollup
5、技巧五:换个思路解题
6、技巧六:union all时可以开启并发执行
7、技巧七:表连接优化
8、技巧八:遵循严格模式

Hive 作为大数据领域常用的数据仓库组件,在平时设计和查询时要特别注意效率。影响Hive效率的几乎从不是数据量过大,而是数据倾斜、数据冗余、job 或 I/O 过多、MapRece 分配不合理等等。对 Hive 的调优既包含对HiveSQL 语句本身的优化,也包含 Hive 配置项和 MR 方面的调整。

列裁剪就是在查询时只读取需要的列。当列很多或者数据量很大时,如果select 所有的列或者不指定分区,导致的全表扫描和全分区扫描效率都很低。Hive中与列裁剪优化相关的配置项是 hive.optimize.cp ,默认是 true 。

分区裁剪就是在查询时只读需要的分区。Hive中与分区裁剪优化相关的则是 hive.optimize.pruner ,默认是 true 。

HiveSQL中的 order by 与其他 SQL 语言中的功能一样,就是将结果按某个字段全局排序,这会导致所有map端数据都进入一个 rece 中,在数据量大时可能会长时间计算不完。

如果使用 sort by ,那么就会视情况启动多个 recer 进行排序,并且保证每个 recer 内局部有序。为了控制 map 端数据分配到 rece 的 key,往往还要配合 distribute by 一同使用。如果不加 distribute by 的话,map 端数据就会随机分配给 recer。

这里需要解释一下, distribute by 和 sort by 结合使用是如何相较于 order by 提升运行效率的。

假如我们要对一张很大的用户信息表按照年龄进行分组,优化前的写法是直接 order by age 。使用 distribute by 和 sort by 结合进行优化的时候, sort by 后面还是 age 这个排序字段, distribute by 后面选择一个没有重复值的均匀字段,比如 user_id 。

这样做的原因是,通常用户的年龄分布是不均匀的,比如20岁以下和50岁以上的人非常少,中间几个年龄段的人又非常多,在 Map 阶段就会造成有些任务很大,有些任务很小。那通过 distribute by 一个均匀字段,就可以让系统均匀地进行“分桶”,对每个桶进行排序,最后再组合,这样就能从整体上提升 MapRece 的效率。

取出 user_trade 表中全部支付用户:

原有写法的执行时长:

优化写法的执行时长:

考虑对之前的案例进行优化:

注意: 在极大的数据量(且很多重复值)时,可以先 group by 去重,再 count() 计数,效率高于直接 count(distinct **) 。

如果我们想知道用户的性别分布、城市分布、等级分布,你会怎么写?

通常写法:

缺点 :要分别写三次SQL,需要执行三次,重复工作,且费时。

那该怎么优化呢?

注意 :这个聚合结果相当于纵向地堆在一起了(Union all),分类字段用不同列来进行区分,也就是每一行数据都包含 4 列,前三列是分类字段,最后一列是聚合计算的结果。

GROUPING SETS() :在 group by 查询中,根据不同的维度组合进行聚合,等价于将不同维度的 group by 结果集进行 union all。聚合规则在括号中进行指定。

如果我们想知道用户的性别分布以及每个性别的城市分布,你会怎么写?

那该怎么优化呢?

注意: 第二列为NULL的,就是性别的用户分布,其余有城市的均为每个性别的城市分布。

cube:根据 group by 维度的所有组合进行聚合

注意 :跑完数据后,整理很关键!!!

rollup:以最左侧的维度为主,进行层级聚合,是cube的子集。

如果我想同时计算出,每个月的支付金额,以及每年的总支付金额,该怎么办?

那应该如何优化呢?

条条大路通罗马,写SQL亦是如此,能达到同样效果的SQL有很多种,要学会思路转换,灵活应用。

来看一个我们之前做过的案例:

有没有别的写法呢?

Hive 中互相没有依赖关系的 job 间是可以并行执行的,最典型的就是
多个子查询union all。在集群资源相对充足的情况下,可以开启并
行执行。参数设置: set hive.exec.parallel=true;

时间对比:

所谓严格模式,就是强制不允许用户执行3种有风险的 HiveSQL 语句,一旦执行会直接报错。

要开启严格模式,需要将参数 hive.mapred.mode 设为 strict 。

好啦,这节课的内容就是这些。以上优化技巧需要大家在平时的练习和使用中有意识地去注意自己的语句,不断改进,就能掌握最优的写法。

Ⅵ hive数组去重

把数据放到collect_set里面就好了

Ⅶ 使用sqoop将hive中的数据导入关系型数据库怎么去重

直接导入hive表
sqoop import --connect jdbc:postgresql://ip/db_name--username user_name --table table_name --hive-import -m 5
内部执行实际分三部,.将数据导入hdfs(可在hdfs上找到相应目录),2.创建hive表名相同的表,3,将hdfs上数据传入hive表中
sqoop根据postgresql表创建hive表
sqoop create-hive-table --connect jdbc:postgresql://ip/db_name --username user_name --table table_name --hive-table hive_table_name( --hive-partition-key partition_name若需要分区则加入分区名称)
导入hive已经创建好的表中
sqoop import --connect jdbc:postgresql://ip/db_name --username user_name --table table_name --hive-import -m 5 --hive-table hive_table_name (--hive-partition-key partition_name --hive-partition-value partititon_value);
使用query导入hive表
sqoop import --connect jdbc:postgresql://ip/db_name --username user_name --query "select ,* from retail_tb_order where \$CONDITIONS" --hive-import -m 5 --hive-table hive_table_name (--hive-partition-key partition_name --hive-partition-value partititon_value);
注意:$CONDITIONS条件必须有,query子句若用双引号,则$CONDITIONS需要使用\转义,若使用单引号,则不需要转义。

Ⅷ hive如何去掉重复数据,显示第一条

去重没问题啊,distinct 就好了,显示第一条可以用row_number函数,不同版本hive的row_number有所不同,你自己查一下吧,row_number可以对同一个key从1开始编号的。

Ⅸ Hive优化

注意:以下SQL不会转为Maprece来执行,Explain用于显示执行计划,可以来验证sql是否发生maprece

        select仅查询本表字段;

        where仅对本表字段做条件过滤;

比如下面的语句是会发生maprece的;(下面的rece没有截图)

        (1)集群模式:hive默认采用的是集群的方式;

        (2)本地模式:首先开启本地模式,测试的时候就可以以本地模式来节省集群资源;

                set hive.exec.mode.local.auto=true;

        注意:hive.exec.mode.local.auto.inputbytes.max默认值为128M表示加载文件的最大值,若大于该配置仍会以集群方式来运行;

通过设置以下参数开启并行模式:默认是不开启并行计算,这是job是线性执行的;

set hive.exec.parallel=true;多个job并行执行提高效率;

注意:hive.exec.parallel.thread.number(一次SQL计算中允许并行执行的job个数的最大值);

通过设置以下参数开启严格模式:

set hive.mapred.mode=strict;(默认为:nonstrict非严格模式)

开启严格模式后,查询会有限制:

    (1)对于分区表,必须添加where对于分区字段的条件过滤,因为hive中的数据量一般都很大,避免全表扫描不添加会执行失败,非分区表正常查询;

    (2)order by语句必须包含limit输出限制;还是为了避免全表扫描

    (3)限制执行笛卡尔积的查询。

(1)Order By - 对于查询结果做全排序,只允许有一个rece处理(当数据量较大时,应慎用。严格模式下,必须结合limit来使用);

(2)Sort  By - 对于单个rece的数据进行排序

(3)Distribute By - 分区排序,经常和Sort By结合使用

(4)Cluster By - 相当于 Sort By +Distribute By(Cluster By不能通过asc、desc的方式指定排序规则;可通过 distribute by column sort by column asc|desc 的方式)

Join计算时,将小表(驱动表)放在join的左边

Map Join:在Map端完成Join,join操作对应maprece是rece阶段,因为shuffle,跟rece阶段比较浪费时间,所以才有了map  join;

两种实现方式:

(1)SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)

(2)开启自动mapjoin : 通过修改以下配置启用自动的map join:

set hive.auto.convert.join = true;(该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用Map

join)

相关配置参数:

hive.mapjoin.smalltable.filesize; (大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行)

hive.ignore.mapjoin.hint;(默认值:true;是否忽略mapjoin hint 即mapjoin标记)

hive.auto.convert.join.noconditionaltask;(默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin)

hive.auto.convert.join.noconditionaltask.size;(将多个mapjoin转化为一个mapjoin时,其表的最大值)    

通过设置以下参数开启在Map端的聚合:

set hive.map.aggr=true; 开启后,map预聚合,相当于map端rece减轻rece 端压力;

相关配置参数:

hive.groupby.mapaggr.checkinterval:map端group by执行聚合时处理的多少行数据(默认:100000)

hive.map.aggr.hash.min.rection:进行聚合的最小比例(预先对100000条数据做聚合,若聚合之后的数据量/100000的值大于该配置0.5,则不会聚合)

hive.map.aggr.hash.percentmemory:map端聚合使用的内存的最大值

hive.map.aggr.hash.force.flush.memory.threshold:map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush

hive.groupby.skewindata: 是否对GroupBy产生的数据倾斜做优化,默认为false,当设置为true时,会进行两次mr,第一次把数据map端随机分配分区,达到均衡数据的目的,第二次进行正常的分区算法执行mr;

文件数目小,容易在文件存储端造成压力,给hdfs造成压力,影响效率

设置合并属性

        是否合并map输出文件:hive.merge.mapfiles=true

        是否合并rece输出文件:hive.merge.mapredfiles=true;

        合并文件的大小:hive.merge.size.per.task=256*1000*1000

去重统计

        数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Rece

Task来完成,这一个Rece需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP

BY再COUNT的方式替换;

由于maptask的数量一般跟切片数量有关,所有我们主要对rece端设置数量;

Map数量相关的参数

        mapred.max.split.size: 一个split的最大值,即每个map处理文件的最大值

        mapred.min.split.size.per.node:一个节点上split的最小值

        mapred.min.split.size.per.rack:一个机架上split的最小值

Rece数量相关的参数

        mapred.rece.tasks:强制指定rece任务的数量

        hive.exec.recers.bytes.per.recer:每个rece任务处理的数据量

        hive.exec.recers.max:每个任务最大的rece数

适用场景:

        (1)小文件个数过多

        (2)task个数过多

通过 set mapred.job.reuse.jvm.num.tasks=n; 来设置(n为task插槽个数);个人理解优点类似于各种连接池的作用;

缺点:设置开启之后,task插槽会一直占用资源,不论是否有task运行,直到所有的task即整个job全部执行完成时,才会释放所有的task插槽资源!

Ⅹ hive优化方法

1、列裁剪和分区裁剪

2、谓词下推

3、sort by 替换order by 

4、group by 代替distinct

5、group by 配置调整

map 端预聚合:set hive.map.aggr=true ; set hive.groupby.mapaggr.checkinterval=100000

倾斜均衡配置项:set hive.groupby.skewindate=true

6、join优化

6.1 大小表,小表前置

6.2 多表Join时key相同

6.3 利用mapjoin特性

6.4 分桶表 mapjoin

6.5 倾斜均衡配置项:

设置hive.optimize.skewjoin=true,开启后,在join过程中hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.task参数还可以控制第二个job的mapper数量,默认10000.

6.7 优化sql处理join数据倾斜

6.7.1、空值或无意义的值:若不需要空值数据,就提前写到where语句过滤掉,如果需要保留的话,将空值key用随机方式打散。

6.7.2、单独处理倾斜key

6.7.3、不同数据类型,join的关联字段类型不一样,导致耗时长,所以需要注意做类型转换(join关联字段,一个是int,一个是string,,计算key的hash值时默认时int类型做的,这样导致所有真正的string类型的key都分配到一个recer上了)

6.7.4、mapjoin的小表比较大的时候,,无法直接使用mapjoin,则 select/*+mapjoin(b)*/  from  a left join ( select /*+mapjoin (b)*/ from b inner join a )

6.8、maprece 优化

6.8.1、调整mapper数

mapper数量与输入文件的split数息息相关。如果想减少mapper数,就适当提高mapred.min.split.size,split数就减少了;如果想增大mapper数,除了降低maperd.min.split.size之外,也可以提高mapred.map.task;一般来讲,如果输入文件是少量大文件,就减少mapper数;如果是大量非小文件,就增大mapper数,如果是小文件,就喝吧小文件

6.8.2 调整recer数

使用参数mapred.rece.task可以直接设定recer数量,如果不设置的话,hive会自行推测,推测逻辑如下:

参数hive.exec.recers.bytes.per.recer.设定每个recer能够处理的最大的数据量,默认时1G

参数hive.exec.recers.max设定每个job的最大的recer数量,默认时999或者1009

得到recer数:recers_num=min(total_input_size/recers.bytes.per.recer,recers.max)

recer数量与输出文件的数量相关,如果recer数太多,会产生大量小文件,对hdfs造成压力,如果recer数太少,每个recer要处理很多数据,容易拖慢运行时间或造成oom

6.8.3 合并小文件

输入阶段合并:需要更改hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.hiveiputformat,我们该成org.apache.hadoop.hive.ql.io.combinehiveinputformat.

这样比调整mapper数时,又多出两个参数,分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现又split大小小于这两个默认值100MB,则会进行合并

输出阶段合并:直接将hive.merge.mapfiles和hive.merge.mapredfiles都设置为true即可。

hive.merge.mapfiles表示将map-only任务的输出合并

hive.merge.mapredfiles表示将map-rece任务的输出合并

另外hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值

hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认时1G

6.9 启用压缩

压缩job的中间结果数据和输出数据,可以少量CPU时间节省出很多空间。压缩方式一般选择snappy,效率最高,要启用中间压缩,需要设定:

hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.code为org.apache.hadoop.io.compress.snappycode.

另外,参数hive.intermediate.compression.type可以选对块block还是对记录record压缩,block压缩率比较高,输出压缩的配置基本相同:打开hive.exec.compress.output

6.10 jvm重用

在mr job中,默认时每执行一个task就会启动一个jvm,如果task非常小而且碎,那么jvm启动和关闭耗时都会比较长。可以通过调节参数mapred.job.reuse.jvm.num.task来重用。例如将这个参数设置为5,那么久代表同一个mr job中顺序执行的5各task可以重复使用一个jvm,减少启动和关闭开销,但是他对不同mr job的task无效

6.11 采用合适的存储格式

6.12 数据倾斜

数据倾斜原因:1、join 倾斜 2、聚合倾斜

group by 倾斜: group by 字段中某个字段的值过多,导致处理这个值得rece耗时很久

解决方法:set hive.map.aggr=true;  -- 表示开启map端聚合

set hive.groupby.skewindata=true;  注意:只能对单个字段聚合 , 生成两个map job ,第一个map job中map输出结果随机分布到rece中,每个rece做部分聚合操作,并输出结果,这样相同的groub key有可能被分发到不同的rece中,从而达到负载均衡的目的;第二个map job再根据预处理的数据结果按照group by key分布到rece中,这个过程可以保证相同的key被分到同一个rece中,最后完成最终的聚合操作

6.13 优化in、exists语句

使用left semi join 替换in 、exists

6.14 合并 maprece操作

multi-group by 是hive的一个非常好的特性,它使得hive中利用中间结果变更非常方便

例如:

from ( select a.status,b.school from status_update a join profilees b on (a.userid=b.suerid)) subq1

insert overwirte table a1 partition(ds='2021-03-07') 

select subq1.school,count(1) group by subq1.school

insert overwrite table b1 partition(ds='2021-03-07')

select subq1.status,count(1) group by subq1.status

上述语句使用了multi-group by特性联系group by 2次数据,使用不同的group by key,这一特性可以减少一次maprece操作

阅读全文

与hive去重distinct大数据相关的资料

热点内容
桌面的文件怎么排成一排 浏览:846
wow锻造怎么升级 浏览:338
选编程和学ps哪个好 浏览:447
他和谁睡了主演 浏览:944
彩票过滤软件用什么语言编程好 浏览:637
wps如何把文件变成excel 浏览:577
红头文件word标题 浏览:788
苹果充电宝哪个牌子好 浏览:119
测序数据里fast是什么 浏览:576
压缩文件怎么改名不会损坏 浏览:188
互相看手机的一部电影 浏览:842
筛选文件内容怎么筛 浏览:884
工业机器人示教编程是指什么 浏览:566
在线能看的资源电影 浏览:308
734什么错误代码 浏览:220
淘宝传文件软件 浏览:310
韩国电影国语发音 浏览:423
第365章天资榜叶凌天 浏览:864
大胸乳房电影 浏览:200
开网络医院需要什么手续 浏览:595

友情链接