We are just beginning to learn hive When , We all know hive It can reduce the learning cost and development cost of programmers , The specific performance is that it can SQL Statement conversion to MapReduce The program runs .
however Hive Some cases of the query may not be used MapReduce Calculation . for example ：SELECT * FROM employees; under these circumstances ,Hive It can be read simply employee The corresponding storage directory under the file , Then output the query results to the console .
stay hive-default.xml.template In file hive.fetch.task.conversion The default is more, The old version hive The default is minimal, The property is changed to more in the future , Global lookup 、 The field lookup 、limit Search and so on mapreduce.
<property> <name>hive.fetch.task.conversion</name> <value>more</value> <description> Expects one of [none, minimal, more]. Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 0. none : disable hive.fetch.task.conversion </description> </property>
Let's go through some practical exercises , Let's see the effect !
Case practice <1> hold hive.fetch.task.conversion Set to none, The query statement is then executed , It will be carried out mapreduce Program .
hive (default)> set hive.fetch.task.conversion=none; hive (default)> select * from score; hive (default)> select s_score from score; hive (default)> select s_score from score limit 3;
<2> hold hive.fetch.task.conversion Set to more, The query statement is then executed , None of the following query methods will be executed mapreduce Program .
hive (default)> set hive.fetch.task.conversion=more; hive (default)> select * from score; hive (default)> select s_score from score; hive (default)> select s_score from score limit 3;
We can clearly see that when hive.fetch.task.conversion Set to none, All the procedures go mapreduce The program takes a certain amount of time . But even if it's set to none, Only part of it sql The sentence will not go mapreduce Program , Is there any way to optimize this problem ？
Most of the Hadoop Job Is the need to Hadoop Provides full scalability to handle large data sets . however , Sometimes Hive The amount of input data is very small . under these circumstances , It may be more expensive to trigger a task for a query than it actually is job The execution time is much more . For most of these cases ,Hive All tasks can be handled on a single machine in local mode . For small data sets , Using local mode, execution time can be significantly reduced .
Users can use the Settings hive.exec.mode.local.auto The value of is true, To make the Hive Automatically start this optimization when appropriate .
set hive.exec.mode.local.auto=true; // Open the local mr
Set up local mr Is the maximum amount of input data , When the amount of input data is less than this value local mr The way , The default is 134217728, namely 128M
Set up local mr The maximum number of input files , Used when the number of input files is less than this value local mr The way , The default is 4
Case practice :
<1> Enable local mode , And execute the query statement
hive (default)> set hive.exec.mode.local.auto=true; hive (default)> select * from score cluster by s_id;
18 rows selected (1.568 seconds)
<2> Turn off local mode , And execute the query statement
hive (default)> set hive.exec.mode.local.auto=false; hive (default)> select * from score cluster by s_id;
18 rows selected (11.865 seconds)
By default ,Map Stages of the same Key The data is distributed to one reduce, When one key The data is skewed when it's too large .
Not all aggregation operations are required Reduce The complete , Many aggregation operations can be performed first Map The end is partially polymerized , Last in Reduce The end result is obtained .
<1> Turn on Map End aggregation parameter Settings
set hive.map.aggr = true;
<2> stay Map The number of entries to be aggregated at the end
set hive.groupby.mapaggr.checkinterval = 100000;
<3> Load balancing when there is data skew （ The default is false）
set hive.groupby.skewindata = true;
The selected item is set to true, The generated query plan will have two MR Job. first MR Job in ,Map The output will be randomly distributed to Reduce in , Every Reduce Do partial aggregation , And output the result , The result of this treatment is the same Group By Key May be distributed to different Reduce in , So as to achieve the goal of load balancing ; the second MR Job And then according to the data result of preprocessing according to Group By Key Distributed to Reduce in （ This process is guaranteed to be the same Group By Key It's distributed to the same Reduce in ）, The final aggregation operation is completed .
It doesn't matter when the data is small , When the amount of data is large , because COUNT DISTINCT The operation needs to use one Reduce Task To complete , This one Reduce Too much data to process , It's going to lead to the whole thing Job It is difficult to complete , commonly COUNT DISTINCT Use the first GROUP BY Again COUNT Mode substitution ：
Environmental preparation ：
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; load data local inpath '/home/admin/softwares/data/100 Ten thousand big table data （id Divide 10 integer ）/bigtable' into table bigtable; set hive.exec.reducers.bytes.per.reducer=32123456;
SELECT count(DISTINCT id) FROM bigtable;
c0 10000 Time taken: 35.49 seconds, Fetched: 1 row(s)
It can be converted into ：
SELECT count(id) FROM (SELECT id FROM bigtable GROUP BY id) a; result ：
Stage-Stage-1: Map: 1 Reduce: 4 Cumulative CPU: 13.07 sec HDFS Read: 120749896 HDFS Write: 464 SUCCESS Stage-Stage-2: Map: 3 Reduce: 1 Cumulative CPU: 5.14 sec HDFS Read: 8987 HDFS Write: 7 SUCCESS _c0 10000 Time taken: 51.202 seconds, Fetched: 1 row(s)
Although there will be one more Job To complete , But with a lot of data , It's definitely worth it .
Try to avoid cartesian products , That is to avoid join I don't add on Conditions , Or invalid on Conditions ,Hive Only use 1 individual reducer To complete the cartesian product .
stay SELECT in , Just take the columns you need , If there is , Use partition filtering whenever possible , To use less SELECT *.
In the partition clipping , When using external associations , If the filter condition of the side table is written in Where Back , So it's going to be a full table association first , And then we filter it , such as ：
Environmental preparation ：
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; load data local inpath '/home/admin/softwares/data/ Increment by increment id Raw data /ori' into table ori; load data local inpath '/home/admin/softwares/data/100 Ten thousand big table data （id Divide 10 integer ）/bigtable' into table bigtable;
First connect and then Where：
SELECT a.id FROM bigtable a LEFT JOIN ori b ON a.id = b.id WHERE b.id <= 10;
The correct way to write it is to write it in ON Back ： First Where Again
SELECT a.id FROM ori a LEFT JOIN bigtable b ON (b.id <= 10 AND a.id = b.id);
Or write it as a subquery ：
SELECT a.id FROM bigtable a RIGHT JOIN (SELECT id FROM ori WHERE id <= 10 ) b ON a.id = b.id;
In relational database , The partition table Insert When the data , The database will automatically partition fields based on their values , Inserts the data into the corresponding partition ,Hive A similar mechanism is provided in , Dynamic partitioning (Dynamic Partition), It's just , Use Hive Dynamic partitioning of , It needs to be configured accordingly . With the partition rule of the first table , To correspond to the partition rule of the second table , All partitions of the first table , Copy it all to the second table , When the second table loads data , There's no need to specify a partition , Just use the partition of the first table
<1> Turn on dynamic partition parameter Settings ①
② Set to non-strict mode （ A pattern of dynamic partitioning , Default strict, Indicates that at least one partition must be specified as a static partition ,nonstrict Pattern representation allows all partitioned fields to be dynamically partitioned .）
③ At all execution MR Node , Maximum number of dynamic partitions can be created .
④ At each execution MR Node , How many dynamic partitions can be created . This parameter needs to be set according to the actual data . such as ： The source data contains a year's worth of data , namely day Field has 365 It's worth , Then this parameter needs to be set to greater than 365, If default values are used 100, May be an error .
⑤ Whole MR Job in , How many can be created HDFS file . stay linux In the system , Every linux Users can turn on at most 1024 A process , Each process can be opened at most 2048 File , That is to hold 2048 File handles , The larger the value below , The larger the file handle is, the larger the file handle is
⑥ When a free partition is generated , Whether to throw an exception . You don't usually need to set it .
<2> Case practice demand ： take ori Data by time ( Such as ：20111231234568), Insert into target table ori_partitioned In the corresponding partition of .
① Prepare the original data sheet
create table ori_partitioned(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time bigint) row format delimited fields terminated by '\t'; load data local inpath '/export/servers/hivedatas/small_data' into table ori_partitioned partition (p_time='20111230000010'); load data local inpath '/export/servers/hivedatas/small_data' into table ori_partitioned partition (p_time='20111230000011');
② Create a partition table
create table ori_partitioned_target(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';
③ analysis If you specify a partition as described before Insert data , So this requirement is not easy to achieve . At this time, we need to use dynamic partition to achieve .
set hive.exec.dynamic.partition = true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.exec.max.dynamic.partitions = 1000; set hive.exec.max.dynamic.partitions.pernode = 100; set hive.exec.max.created.files = 100000; set hive.error.on.empty.partition = false; INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time) SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time FROM ori_partitioned;
Be careful ： stay PARTITION (month,day) Specify the partition field name in ;
stay SELECT The last few fields of the clause , Must correspond to the front PARTITION (month,day) The partition field specified in , Including the order .
Check the partition ：
hive> show partitions ori_partitioned_target; OK p_time=20111230000010 p_time=20111230000011
So much for this sharing , In the future, small bacteria will bring you Hive Other series of performance tuning , Coming soon ! Those who benefit from it, please remember to like it ٩(๑*ᴗ*๑)۶
Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .