Keven He 2022-05-14 14:33:38 阅读数:172
Fetch Fetching refers to ,Hive Some cases of the query may not be used MapReduce Calculation . for example ：SELECT * FROM employees; under these circumstances ,Hive It can be read simply employee The corresponding storage directory under the file , Then output the query results to the console .
stay hive-default.xml.template In file hive.fetch.task.conversion The default is more, The old version hive The default is minimal, The property is changed to more in the future , Global lookup 、 The field lookup 、limit Search and so on mapreduce.
<property> <name>hive.fetch.task.conversion</name> <value>more</value> <description> Expects one of [none, minimal, more]. Some select queries can be converted to single FETCH task minimizing latency. Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins. 0. none : disable hive.fetch.task.conversion 1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only 2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns) </description> </property>
Case practice ：
1） hold hive.fetch.task.conversion Set to none, The query statement is then executed , It will be carried out mapreduce Program .
set hive.fetch.task.conversion=none; select * from emp; select ename from emp; select ename from emp limit 3;
2） hold hive.fetch.task.conversion Set to more, The query statement is then executed , None of the following query methods will be executed mapreduce Program .
set hive.fetch.task.conversion=more; select * from emp; select ename from emp; select ename from emp limit 3;
Most of the Hadoop Job Is the need to Hadoop Provides full scalability to handle large data sets . however , Sometimes Hive The amount of input data is very small . under these circumstances , The execution of the task for the query trigger may take longer than it actually does job The execution time is much more . For most of these cases ,Hive All tasks can be handled on a single machine in local mode . For small data sets , Execution time can be significantly reduced .
Users can use the Settings hive.exec.mode.local.auto The value of is true, To make the Hive Automatically start this optimization when appropriate .
set hive.exec.mode.local.auto=true; // Open the local mr
// Set up local mr Is the maximum amount of input data , When the amount of input data is less than this value local mr The way , The default is 134217728, namely 128M
// Set up local mr The maximum number of input files , Used when the number of input files is less than this value local mr The way , The default is 4
Case practice ：
1） Enable local mode , And execute the query statement
set hive.exec.mode.local.auto=true; select * from emp cluster by deptno;
2） Turn off local mode , And execute the query statement
set hive.exec.mode.local.auto=false; select * from emp cluster by deptno;
take key Relative dispersion , And the small amount of data in the table join Left side , This can effectively reduce the probability of memory overflow errors ; Further , have access to map join Let the small dimension table （1000 The number of records below ） Advanced memory . stay map The complete reduce.
Actual test findings ： New version of the hive Already on the small table JOIN Big watch and big watch JOIN Small tables are optimized . There is no obvious difference between a small watch on the left and a small watch on the right .
Test the big table JOIN Watch and watch JOIN Big table efficiency
2． Kenda table 、 A small table and JOIN Statements for the back table
// Create a large table create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; // Create a small table create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; // establish join Statements for the back table create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
3． Import data into large and small tables, respectively
load data local inpath '/opt/module/data/bigtable' into table bigtable; load data local inpath '/opt/module/data/smalltable' into table smalltable;
4． close mapjoin function （ The default is on ）
set hive.auto.convert.join = false;
5． Perform a small table JOIN The big table statements
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from smalltable s left join bigtable b on b.id = s.id;
6． Carry out the table JOIN The little table statements
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from bigtable b left join smalltable s on s.id = b.id;
1． empty KEY Filter
Sometimes join Timeout because of some key Too much data , And the same key The corresponding data will be sent to the same reducer On , This leads to insufficient memory . At this point we should carefully analyze these exceptions key, In many cases , these key The corresponding data is the exception data , We need to be in SQL Filter in statements . for example key The corresponding field is empty , The operation is as follows ：
（1） Configure history server
To configure mapred-site.xml
<property> <name>mapreduce.jobhistory.address</name> <value>hadoop102:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>hadoop102:19888</value> </property>
Start history server
sbin/mr-jobhistory-daemon.sh start historyserver
（2） Create the raw data table 、 empty id surface 、 Consolidated data table
// Create the original table create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; // Create an empty id surface create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; // establish join Statements for the back table create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
（3） Load the raw data and null, respectively id Data into the corresponding table
load data local inpath '/opt/module/datas/ SogouQ1.txt' into table ori; load data local inpath '/opt/module/data/nullid' into table nullidtable;
（4） The test does not filter empty id
insert overwrite table jointable select n.* from nullidtable n left join ori o on n.id = o.id;
（5） Test filter id
insert overwrite table jointable select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = o.id;
2． empty key transformation
Sometimes though a certain key Null corresponds to a lot of data , But the corresponding data is not abnormal data , Must be included in join The results of , So now we have a table a in key Assign a random value to an empty field , So that the data is randomly and evenly divided reducer On .
If you don't specify MapJoin Or not MapJoin Conditions , that Hive The parser will Join Operation conversion to Common Join, namely ： stay Reduce Stages to complete join. Prone to data skew . It can be used MapJoin Load all the small tables into memory map End of join, avoid reducer Handle .
1． Turn on MapJoin Parameter setting
（1） Set auto selection MapJoin
set hive.auto.convert.join = true; The default is true
（2） Threshold setting for large tables and small tables （ Default 25M I think it's a small watch ）：
mapjoin Working mechanism ：
Case practice ：
（1） Turn on Mapjoin function
set hive.auto.convert.join = true; The default is true
（2） Perform a small table JOIN The big table statements
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from smalltable s join bigtable b on s.id = b.id;
（3） Carry out the table JOIN The little table statements
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from bigtable b join smalltable s on s.id = b.id;
By default ,Map Stages of the same Key The data is distributed to one reduce, When one key The data is skewed when it's too large .
Not all aggregation operations are required Reduce The complete , Many aggregation operations can be performed first Map The end is partially polymerized , Last in Reduce The end result is obtained .
1． Turn on Map End aggregation parameter Settings
（1） Whether in Map End polymerization , The default is True
hive.map.aggr = true
（2） stay Map The number of entries to be aggregated at the end
hive.groupby.mapaggr.checkinterval = 100000
（3） Load balancing when there is data skew （ The default is false）
hive.groupby.skewindata = true
The selected item is set to true, The generated query plan will have two MR Job. first MR Job in ,Map The output will be randomly distributed to Reduce in , Every Reduce Do partial aggregation , And output the result , The result of this treatment is the same Group By Key May be distributed to different Reduce in , So as to achieve the goal of load balancing ; the second MR Job And then according to the data result of preprocessing according to Group By Key Distributed to Reduce in （ This process is guaranteed to be the same Group By Key It's distributed to the same Reduce in ）, The final aggregation operation is completed .
It doesn't matter when the data is small , When the amount of data is large , because COUNT DISTINCT The operation needs to use one Reduce Task To complete , This one Reduce Too much data to process , It's going to lead to the whole thing Job It is difficult to complete , commonly COUNT DISTINCT Use the first GROUP BY Again COUNT Mode substitution ：
1． Create a large table
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
2． Load data
load data local inpath '/opt/module/datas/bigtable' into table bigtable;
3． Set up 5 individual reduce Number
set mapreduce.job.reduces = 5;
4． Perform to heavy id Inquire about
select count(distinct id) from bigtable;
5． use GROUP by duplicate removal id
select count(id) from (select id from bigtable group by id) a;
Although there will be one more Job To complete , But with a lot of data , It's definitely worth it .
Try to avoid cartesian products ,join I don't add on Conditions , Or invalid on Conditions ,Hive Only use 1 individual reducer To complete the cartesian product .
Column processing ： stay SELECT in , Just take the columns you need , If there is , Use partition filtering whenever possible , To use less SELECT *.
Line processing ： In the partition clipping , When using external associations , If the filter condition of the side table is written in Where Back , So it's going to be a full table association first , And then we filter it , such as ：
Case practice ：
1． The test starts by associating two tables , Reuse where filter
select o.id from bigtable b join ori o on o.id = b.id where o.id <= 10;
2． After sub query , Associative table again
select b.id from bigtable b join (select id from ori where id <= 10 ) o on b.id = o.id;
In relational database , The partition table Insert When the data , The database will automatically partition fields based on their values , Inserts the data into the corresponding partition ,Hive A similar mechanism is provided in , Dynamic partitioning (Dynamic Partition), It's just , Use Hive Dynamic partitioning of , It needs to be configured accordingly .
1． Turn on dynamic partition parameter Settings
（1） Enable dynamic partitioning （ Default true, Turn on ）
（2） Set to non-strict mode （ A pattern of dynamic partitioning , Default strict, Indicates that at least one partition must be specified as a static partition ,nonstrict Pattern representation allows all partitioned fields to be dynamically partitioned .）
（3） At all execution MR Node , Maximum number of dynamic partitions can be created .
（4） At each execution MR Node , How many dynamic partitions can be created . This parameter needs to be set according to the actual data . such as ： The source data contains a year's worth of data , namely day Field has 365 It's worth , Then this parameter needs to be set to greater than 365, If default values are used 100, May be an error .
（5） Whole MR Job in , How many can be created HDFS file .
（6） When a free partition is generated , Whether to throw an exception . You don't usually need to set it .
2． Case practice
demand ： take ori Data by time ( Such as ：20111230000008), Insert into target table ori_partitioned_target In the corresponding partition of .
（1） Create a partition table
create table ori_partitioned( id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) partitioned by (p_time bigint) row format delimited fields terminated by '\t';
（2） Load the data into the partitioned table
load data local inpath '/home/ds1' into table ori_partitioned partition(p_time='20111230000010') ; load data local inpath '/home/ds2' into table ori_partitioned partition(p_time='20111230000011') ;
（3） Create the target partition table
create table ori_partitioned_target( id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';
（4） Set dynamic partitioning
set hive.exec.dynamic.partition = true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.exec.max.dynamic.partitions = 1000; set hive.exec.max.dynamic.partitions.pernode = 100; set hive.exec.max.created.files = 100000; set hive.error.on.empty.partition = false; insert overwrite table ori_partitioned_target partition (p_time) select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned;
（5） View the partition status of the target partition table
show partitions ori_partitioned_target;
1） Usually , The assignment will pass input Creates one or more directories map Mission .
The main determinants are ：input The total number of files ,input File size , Cluster set file block size .
2） Is it right? map The more the better ？
The answer is No . If a task has many small files （ Much smaller than the block size 128m）, Each small file is also treated as a block , Use one map Task to complete , And one map The task takes much longer to start and initialize than the logical processing , It's a huge waste of resources . and , Simultaneously executable map Numbers are limited .
3） Is it guaranteed that map Processing close to 128m The file blocks , I'll rest easy ？
The answer is not necessarily . Let's say I have a 127m The file of , You normally use one map To complete , But this file has only one or two small fields , Tens of millions of records , If map The processing logic is complex , Use one map Task to do , It's definitely time consuming .
Answer the above question 2 and 3, We need to take two approaches ： The reduced map And increase the number of map Count ;
stay map Merge small files before execution , Reduce map Count ：CombineHiveInputFormat Has the ability to merge small files （ System default format ）.HiveInputFormat There is no ability to merge small files .
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
When input The papers are very large , The logic of the task is complex ,map When execution is very slow , You can think about adding Map Count , To make each map The amount of data being processed is reduced , So as to improve the efficiency of task execution .
increase map The method to ： according to computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M The formula , adjustment maxSize Maximum . Give Way maxSize Maximum value below blocksize You can increase map The number of .
Case practice ：
1． Execute the query
select count(*) from emp;
2． Set the maximum slice value to 100 Bytes
set mapreduce.input.fileinputformat.split.maxsize=100; select count(*) from emp;
1． adjustment reduce Number method one
（1） Every Reduce The default is 256MB
（2） Maximum for each task reduce Count , The default is 1009
（3） Calculation reducer The number of formula
N=min( Parameters 2, Total amount of input data / Parameters 1)
2． adjustment reduce Number method two
stay hadoop Of mapred-default.xml Modification in file
Set each job Of Reduce Number
set mapreduce.job.reduces = 15;
3．reduce More is not better
1） Excessive startup and initialization reduce It also consumes time and resources ;
2） in addition , How many reduce, How many output files are there , If you generate a lot of small files , So if these little files are used as input for the next task , There is also the problem of too many small files ;
Set up reduce These two principles also need to be considered when calculating ： Handle large amounts of data using appropriate reduce Count ; Make a single reduce The amount of data the task handles should be the right size ;
Hive Transforms a query into one or more phases . The stage could be MapReduce Stage 、 The sampling period 、 Consolidation phase 、limit Stage . perhaps Hive Other phases that may be required during execution . By default ,Hive Only one phase is executed at a time . however , A particular job There may be many phases involved , And these stages may not be entirely interdependent , That is, some phases can be executed in parallel , That might make the whole thing job The execution time is shortened . however , If more phases could be executed in parallel , that job The sooner it's probably done .
By setting parameters hive.exec.parallel The value is true, You can turn on concurrent execution . however , In a Shared cluster , You have to be careful , If job The number of parallel phases increases , Then the cluster utilization will increase .
set hive.exec.parallel=true; // Open tasks for parallel execution set hive.exec.parallel.thread.number=16; // The same sql Maximum parallelism allowed , The default is 8.
Of course , Only when the system resources are relatively idle , otherwise , No resources , Parallel doesn't work .
Hive Provides a strict pattern , This prevents users from executing queries that may have unexpected adverse effects .
By setting properties hive.mapred.mode The value is the default non-strict mode nonstrict . Turning strict mode on requires modification hive.mapred.mode The value is strict, Turning strict mode on can be disabled 3 Types of queries .
<property> <name>hive.mapred.mode</name> <value>strict</value> <description> The mode in which the Hive operations are being performed. In strict mode, some risky queries are not allowed to run. They include: Cartesian Product. No partition being picked up for a query. Comparing bigints and strings. Comparing bigints and doubles. Orderby without limit. </description> </property>
For partitioned tables , Unless where The statement contains a partition field filter condition to limit the scope , Otherwise, execution is not allowed . let me put it another way , Users are not allowed to scan all partitions . The reason for this restriction is , Partitioned tables typically have very large data sets , And the Numbers are growing fast . Queries that are not partitioned can consume unacceptably large resources to process this table .
For the order by Query of statement , Be required to use limit sentence . because order by In order to perform the sorting process all the resulting data is distributed to the same Reducer Intermediate processing , Force the user to add this LIMIT Statements prevent Reducer Extra execution for a long time .
Queries that restrict cartesian products . Users who know a lot about relational databases might expect to execute JOIN Not used when querying ON Statement instead USES where sentence , This allows the relational database's execution optimizer to efficiently WHERE Convert the statement to that ON sentence . Unfortunately ,Hive This optimization is not performed , therefore , If the watch is big enough , Then the query will get out of control .
JVM Reuse is Hadoop Tune the contents of the parameters , the Hive Performance has a very big impact , Especially for scenes where it is difficult to avoid small files or task A lot of scenes , Most of these scenarios have a short execution time .
Hadoop The default configuration is usually to use derivation JVM To execute map and Reduce Mission . At this time JVM The startup process can be quite expensive , Especially for execution job There are hundreds of thousands of them task Mission status .JVM Reuse allows JVM Instance in the same job Reuse in N Time .N The value of can be in Hadoop Of mapred-site.xml File to configure . Usually in 10-20 Between , How much needs to be tested according to specific business scenarios .
<property> <name>mapreduce.job.jvm.numtasks</name> <value>10</value> <description>How many tasks to run per jvm. If set to -1, there is no limit. </description> </property>
The downside of this feature is that , Turn on JVM Reuse will always be used to task slot , For reuse , Not released until the mission is complete . If a “ unbalanced ”job Some of them reduce task It takes longer to execute than the others Reduce task It takes a lot of time , Then the reserved slot will always be free but cannot be otherwise job Use , Until all task It won't be released until it's over .
In a distributed cluster environment , Because the program Bug（ Include Hadoop Of itself bug）, Causes of unbalanced load or unequal distribution of resources , This can cause multiple tasks of the same job to run at inconsistent speeds , Some tasks may run significantly slower than others （ For example, a task for a job is only on schedule 50%, All other tasks have been run ）, These tasks slow down the overall execution of the job . To avoid that ,Hadoop Speculative execution is used （Speculative Execution） Mechanism , It follows from certain rules “ dragging ” The task of , And start a backup task for such a task , Have the task process the same data as the original task , Finally, the calculation result of the first successful operation is selected as the final result .
Set the expected execution parameters to be turned on ：Hadoop Of mapred-site.xml File to configure
<property> <name>mapreduce.map.speculative</name> <value>true</value> <description>If true, then multiple instances of some map tasks may be executed in parallel.</description> </property> <property> <name>mapreduce.reduce.speculative</name> <value>true</value> <description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description> </property>
however hive It also provides configuration items to control reduce-side Presumed execution ：
<property> <name>hive.mapred.reduce.tasks.speculative.execution</name> <value>true</value> <description>Whether speculative execution for reducers should be turned on. </description> </property>
About tuning these inferred execution variables , It is difficult to give a specific suggestion . If the user is sensitive to run-time deviations , So you can turn those off . If the user needs to execute time due to the large amount of input data map perhaps Reduce task Words , So the waste caused by the startup speculation is huge .
1． Basic grammar
EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
2． Case practice
（1） View the execution plan for the following statement
explain select * from emp; explain select deptno, avg(sal) avg_sal from emp group by deptno;
（2） View the detailed execution plan
explain extended select * from emp; explain extended select deptno, avg(sal) avg_sal from emp group by deptno;
版权声明：本文为[Keven He]所创，转载请带上原文链接，感谢。 https://javamana.com/2022/134/202205141423370465.html