Recent Posts

Thursday, 26 April 2018

Hive Architecture

The Apache Hive components are 
Metastore – Metastore stores metadata for each of the tables like their schema and location. Hive also includes the partition metadata. This helps the driver to track the progress of various data sets distributed over the cluster. It stores the data in a traditional RDBMS format. Hive metadata helps the driver to keep a track of the data and it is highly crucial. Backup server regularly replicates the data which it can retrieve in case of data loss.

Driver – Driver acts like a controller which receives the HiveQL statements. The driver starts the execution of statement by creating sessions. It monitors the life cycle and progress of the execution. Driver stores the necessary metadata generated during the execution of a HiveQL statement. It also acts as a collection point of data or query result obtained after the Reduce operation.

Compiler – Compiler performs the compilation of the HiveQL query. This converts the query to an execution plan. The plan contains the tasks. It also contains steps needed to be performed by the MapReduce to get the output as translated by the query. The compiler in Hive converts the query to an Abstract Syntax Tree (AST). First, check for compatibility and compile time errors, then converts the AST to a Directed Acyclic Graph (DAG).

Optimizer – It performs various transformations on the execution plan to provide optimized DAG. It aggregates the transformations together, such as converting a pipeline of joins to a single join, for better performance. The optimizer can also split the tasks, such as applying a transformation on data before a reduce operation, to provide better performance.

Executor – Once compilation and optimization complete, the executor executes the tasks. Executor takes care of pipelining the tasks.

CLI, UI, and Thrift Server – CLI (command-line interface) provide a user interface for an external user to interact with Hive. Thrift server in Hive allows external clients to interact with Hive over a network, similar to the JDBC or ODBC protocols.

Hive Shell 
     Using Hive shell, we can interact with the Hive; we can issue our commands or queries in HiveQL inside the Hive shell. Hive Shell is almost similar to MySQL Shell. It is the command line interface for Hive. In Hive Shell users can run HQL queries. HiveQL is also case-insensitive (except for string comparisons) same as SQL. We can run the Hive Shell in two modes which are: Non-Interactive mode and Interactive mode.

1. Non-Interactive mode –
Hive Shell can be run in the non-interactive mode, with -f option we can specify the location of a file which contains HQL queries. For example- hive -f my-script.q

2. Interactive mode – Hive Shell can also be run in the interactive mode. In interactive mode, we directly need to go to the hive shell and run the queries there. In hive shell, we can submit required queries manually and get the result. For example- $bin/hive, go to hive shell.

Features of Hive 
     There are so many features of Apache Hive. Let’s discuss them one by one-
1. Hive provides data summarization, query, and analysis in much easier manner.
2. Hive supports external tables which make it possible to process data without actually storing in HDFS.
3. Apache Hive fits the low-level interface requirement of Hadoop perfectly.
4. It also supports partitioning of data at the level of tables to improve performance.
5. Hive has a rule-based optimizer for optimizing logical plans.
6. It is scalable, familiar, and extensible.
7. Using HiveQL doesn’t require any knowledge of programming language, Knowledge of basic SQL query is enough.
8. We can easily process structured data in Hadoop using Hive.
9. Querying in Hive is very simple as it is similar to SQL.
10. We can also run Ad-hoc queries for the data analysis using Hive.

Limitations of Hive
1. Hive does not offer real-time queries.
2. Hive does not offer row-level updates or deletes.
3. Provides acceptable latency for interactive data browsing.
4. Sub-queries are not supported in Hive
5. Latency for Apache Hive queries is generally very high.
6. It is not good for online transaction processing.
7. Outer joins are not supported.
8. Supports overwriting or apprehending data but not updates and deletes.

Hive Architecture and its Components
     Hive Architecture can be categorized into the following components.
1. Hive Clients
     Apache Hive supports all application written in languages like C++, Java, Python etc. using JDBC, Thrift and ODBC drivers. Thus, one can easily write Hive client application written in a language of their choice. The Hive supports different types of client applications for performing queries. These clients are categorized into 3 types:
1. Thrift Clients – As Apache Hive server is based on Thrift, so it can serve the request from all those languages that support Thrift.
2. JDBC Clients – Apache Hive allows Java applications to connect to it using JDBC driver. It is defined in the class apache.hadoop.hive.jdbc.HiveDriver.
3. ODBC Clients – ODBC Driver allows applications that support ODBC protocol to connect to Hive. For example, JDBC driver, ODBC uses Thrift to communicate with the Hive server.

2. Hive Services
     Hive provides various services like web Interface, CLI etc. to perform queries. 
1. Hive CLI (Command Line Interface) – This is the default shell that Hive provides, in which you can execute your Hive queries and command directly.
2. Apache Hive Web Interfaces Apart from the command line interface, hive also provides a web based GUI for executing Hive queries and commands.
3. Hive Server Hive server is built on Apache Thrift and therefore, is also referred as Thrift Server that allows different clients to submit requests to Hive and retrieve the final result.
4. Hive Deriver – Driver is responsible for receiving the queries submitted Thrift, JDBC, ODBC, CLI, Web UL interface by a Hive client.
  • Complier – After that hive driver passes the query to the compiler. Where parsing, type checking, and semantic analysis takes place with the help of schema present in the metastore.
  • Optimizer – It generates the optimized logical plan in the form of a DAG (Directed Acyclic Graph) of MapReduce and HDFS tasks.
  • Executor – Once compilation and optimization complete, execution engine executes these tasks in the order of their dependencies using Hadoop.
5. Metastore – Metastore is the central repository of Apache Hive metadata in the Hive Architecture. It stores metadata for Hive tables (like their schema and location) and partitions in a relational database. It provides client access to this information by using metastore service API. Hive metastore consists of two fundamental units:
  • A service that provides metastore access to other Apache Hive services.
  • Disk storage for the Hive metadata which is separate from HDFS storage.
3. Processing framework and Resource Management – Hive internally uses Hadoop MapReduce framework to execute the queries.

4. Distributed Storage – Hive is built on the top of Hadoop, so it uses the underlying HDFS for the distributed storage

How to process data with Apache Hive?
1. User Interface (UI) calls the execute interface to the Driver.
2. The driver creates a session handle for the query. Then it sends the query to the compiler to generate an execution plan.
3. The compiler needs the metadata. So, it sends a request for getMetaData. Thus, receives the sendMetaData request from Metastore.
4. Now compiler uses this metadata to type check the expressions in the query. The compiler generates the plan which is DAG of stages with each stage being either a map/reduce job, a metadata operation or an operation on HDFS. The plan contains map operator trees and a reduce operator tree for map/reduce stages.
5. Now execution engine submits these stages to appropriate components. After in each task the deserializer associated with the table or intermediate outputs is used to read the rows from HDFS files. Then pass them through the associated operator tree. Once it generates the output, write it to a temporary HDFS file through the serializer. Now temporary file provides the subsequent map/reduce stages of the plan. Then move the final temporary file to the table’s location for DML operations.
6. Now for queries, execution engine directly read the contents of the temporary file from HDFS as part of the fetch call from the Driver.

Hive Metastore
     Metastore is the central repository of Apache Hive metadata. It stores metadata for Hive tables (like their schema and location) and partitions in a relational database. It provides client access to this information by using metastore service API. Hive metastore consists of two fundamental units:
1. A service that provides metastore access to other Apache Hive services.
2. Disk storage for the Hive metadata which is separate from HDFS storage.
     There are three modes for Hive Metastore deployment
1. Embedded Metastore
2. Local Metastore
3. Remote Metastore

1. Embedded Metastore  
     Both the metastore service and the Hive service runs in the same JVM by default using an embedded Derby Database instance where metadata is stored in the local disk. This is called embedded metastore configuration. In this case, only one user can connect to metastore database at a time. If you start a second instance of Hive driver, you will get an error. This is good for unit testing, but not for the practical solutions.

2. Local Metastore
     Hive is the data-warehousing framework, so hive does not prefer single session. To overcome this limitation of Embedded Metastore, Local Metastore was introduced. This configuration allows us to have multiple Hive sessions i.e. Multiple users can use the metastore database at the same time. This is achieved by using any JDBC compliant database like MySQL which runs in a separate JVM or a different machine than that of the Hive service and metastore service which are running in the same JVM as shown above. In general, the most popular choice is to implement a MySQL server as the metastore database.

3. Remote Metastore  
     In the remote metastore configuration, the metastore service runs on its own separate JVM and not in the Hive service JVM. In this mode, metastore runs on its own separate JVM, not in the Hive service JVM. If other processes want to communicate with the metastore server they can communicate using Thrift Network APIs. We can also have one more metastore servers in this case to provide more availability. The main advantage of using remote metastore is you do not need to share JDBC login credential with each Hive user to access the metastore database.

Databases Supported by Hive
      Hive supports 5 back end databases which are as follows:
1. Derby
2. MySQL
3. MS SQL Server
4. Oracle
5. Postgres

Hive Data Model
      Data in Hive can be categorized into three types on the granular level
1. Table
2. Partition
3. Bucket

1. Table
     Apache Hive tables are the same as the tables present in a Relational Database. The table in Hive is logically made up of the data being stored. And the associated metadata describes the layout of the data in the table. We can perform filter, project, join and union operations on tables. In Hadoop data typically resides in HDFS, although it may reside in any Hadoop filesystem, including the local filesystem or S3. But Hive stores the metadata in a relational database and not in HDFS. Hive has two types of tables which are as follows:
1. Managed Table
2. External Table
In Hive when we create a table, Hive by default manage the data. It means that Hive moves the data into its warehouse directory. Alternatively, we can also create an external table, it tells Hive to refer to the data that is at an existing location outside the warehouse directory. We can see the difference between the two table type in the LOAD and DROP semantics. Let us consider a Managed table first.

i. Managed Table
     When we load data into a Managed table, Hive moves data into Hive warehouse directory.
CREATE TABLE managed_table (dummy STRING); 
LOAD DATA INPATH '/user/ashok/ashok_data.txt' INTO table managed_table; 
     As the name suggests (managed table), Hive is responsible for managing the data of a managed table. In other words, what I meant by saying, “Hive manages the data”, is that if you load the data from a file present in HDFS into a Hive Managed Table and issue a DROP command on it, the table along with its metadata will be deleted. So, the data belonging to the dropped managed_table no longer exist anywhere in HDFS and you can’t retrieve it by any means. Basically, you are moving the data when you issue the LOAD command from the HDFS file location to the Hive warehouse directory.
Note: The default path of the warehouse directory is set to/user/hive/warehouse. The data of a Hive table resides in warehouse_directory/table_name (HDFS). You can also specify the path of the warehouse directory in the hive.metastore.warehouse.dir configuration parameter present in the hive-site.xml.

ii. External Table
     The external table in Hive behaves differently. We can control the creation and deletion of the data. The location of the external data is specified at the table creation time.
CREATE EXTERNAL TABLE external_table (dummy STRING) 
LOCATION '/user/ashok/external_table'; 
LOAD DATA INPATH '/user/ashok/ashok.txt' INTO TABLE external_table; 
     For external table, Hive is not responsible for managing the data. In this case, when you issue the LOAD command, Hive moves the data into its warehouse directory. Then, Hive creates the metadata information for the external table. Now, if you issue a DROP command on the external table, only metadata information regarding the external table will be deleted. Therefore, you can still retrieve the data of that very external table from the warehouse directory using HDFS commands.

2. Partition
     Apache Hive organizes tables into partitions for grouping similar type of data together based on a column or partition key. Each table in the hive can have one or more partition keys to identify a particular partition. Using partition, we can also make it faster to do queries on slices of the data.
CREATE TABLE table_name (column1 data_type, column2 data_type) PARTITIONED BY 
(partition1 data_type, partition2 data_type,….); 
     Remember, the most common mistake made while creating partitions is to specify an existing column name as a partition column. While doing so, you will receive an error – “Error in semantic analysis: Column repeated in partitioning columns”.
     Let us understand partition by taking an example where I have a table student_details containing the student information of some engineering college like student_id, name, department, year, etc. Now, if I perform partitioning based on department column, the information of all the students belonging to a particular department will be stored together in that very partition. Physically, a partition is nothing but a sub-directory in the table directory.
     Let’s say we have data for three departments in our student_details table – CSE, ECE and IT. Therefore, we will have three partitions in total for each of the departments. For each department we will have all the data regarding that very department residing in a separate sub – directory under the Hive table directory. For example, all the student data regarding CSE departments will be stored in user/hive/warehouse/student_details/dept.=CSE. So, the queries regarding CSE students would only have to look through the data present in the CSE partition. This makes partitioning very useful as it reduces the query latency by scanning only relevant partitioned data instead of the whole data set. In fact, in real world implementations, you will be dealing with hundreds of TBs of data. So, imagine scanning this huge amount of data for some query where 95% data scanned by you was un-relevant to your query.

3. Buckets
     Now, you may divide each partition or the unpartitioned table into Buckets based on the hash function of a column in the table. Actually, each bucket is just a file in the partition directory or the table directory (unpartitioned table). Therefore, if you have chosen to divide the partitions into n buckets, you will have n files in each of your partition directory.
CREATE TABLE table_name PARTITIONED BY (partition1 data_type, partition2 data_type,….)
CLUSTERED BY (column_name1, column_name2, …) SORTED BY (column_name [ASC|DESC], …)] 
INTO num_buckets BUCKETS;
     Now, you may divide each partition or the unpartitioned table into Buckets based on the hash function of a column in the table. Actually, each bucket is just a file in the partition directory or the table directory (unpartitioned table). Therefore, if you have chosen to divide the partitions into n buckets, you will have n files in each of your partition directory. For example, you can see the above image where we have bucketed each partition into 2 buckets. So, each partition, say CSE, will have two files where each of them will be storing the CSE student’s data.

How Hive distributes the rows into buckets?
     Hive determines the bucket number for a row by using the formula: hash_function (bucketing_column) modulo (num_of_buckets). Here, hash_function depends on the column data type. For example, if you are bucketing the table on the basis of some column, let’s say user_id, of INT datatype, the hash_function will be – hash_function (user_id)= integer value of user_id. And, suppose you have created two buckets, then Hive will determine the rows going to bucket 1 in each partition by calculating: (value of user_id) modulo (2). Therefore, in this case, rows having user_id ending with an even integer digit will reside in a same bucket corresponding to each partition. The hash_function for other data types is a bit complex to calculate and in fact, for a string it is not even humanly recognizable.

     If you are using Apache Hive 0.x or 1.x, you have to issue command – set hive.enforce.bucketing = true; from your Hive terminal before performing bucketing. This will allow you to have the correct number of reducer while using cluster by clause for bucketing a column. In case you have not done it, you may find the number of files that has been generated in your table directory are not equal to the number of buckets. As an alternative, you may also set the number of reducer equal to the number of buckets by using set mapred.reduce.task = num_bucket.

Why do we need buckets?
     There are 2 reasons for performing bucketing to a partition
1. A map side join requires the data belonging to a unique join key to be present in the same partition. But what about those cases where your partition key differs from join? Therefore, in these cases you can perform a map side join by bucketing the table using the join key.
2. Bucketing makes the sampling process more efficient and therefore, allows us to decrease the query time.