Adaptive Dynamic Data Placement Algorithm for Hadoop in Heterogeneous Environments
Subject Areas : Computer Networks and Distributed SystemsAvishan Sharafi 1 , Ali Rezaee 2
1 - Department of Computer Engineering, Islamic Azad University South Tehran Branch
2 - Department of Computer Engineering, Islamic Azad University, Science and Research Branch,Tehran, Iran.
Keywords:
Abstract :
[1] G. Turkington, 2013. Hadoop Beginner's Guide: Packt Publishing Ltd.
[2] A. Holmes , 2012. Hadoop in practice: Manning Publications Co.
[3] R. D. Schneider, 2012. Hadoop for Dummies Special Edition, John Wiley&Sons Canada.
[4] C.-W. Lee, K.-Y. Hsieh, S.-Y. Hsieh, and H.-C. Hsiao, 2014. A dynamic data placement strategy for hadoop in heterogeneous environments, Big Data Research,1, pp. 14-22
[5] A. Hadoop, "Welcome to apache hadoop," Hämtat från http://hadoop. apache. org, 2014.
[6] R. Xiong, J. Luo, and F. Dong, 2015. Optimizing data placement in heterogeneous Hadoop clusters, Cluster Computing, 18, pp. 1465-1480.
[7] J. Xie, S. Yin, X. Ruan, Z. Ding, Y. Tian, J. Majors, et al, 2010. Improving mapreduce performance through data placement in heterogeneous hadoop clusters, in Parallel & Distributed Processing, Workshops and Phd Forum (IPDPSW), IEEE International Symposium on, 2010, pp. 1-9.
[8] K. Singh and R. Kaur, 2014. Hadoop: addressing challenges of big data. In Advance Computing Conference (IACC), on (pp. 686-689). IEEE.
[9] X. Xu, L. Cao, and X. Wang, 2014. Adaptive task scheduling strategy based on dynamic workload adjustment for heterogeneous Hadoop clusters.
[10] P. Xu, H. Wang, and M. Tian, 2014.New Scheduling Algorithm in Hadoop Based on Resource Aware in Practical Applications of Intelligent Systems, ed: Springer, pp. 1011-1020.
[11] Z. Tang, J. Zhou, K. Li, and R. Li, 2012. MTSD: A task scheduling algorithm for MapReduce base on deadline constraints, in Parallel and Distributed Processing Symposium Workshops & PhD Forum (IPDPSW), IEEE 26th International.
1
Journal of Advances in Computer Engineering and Technology
Adaptive Dynamic Data Placement Algorithm for Hadoop in Heterogeneous
Environments
Avishan Sharafi 2, Ali Rezaee3
Received (Day Month Year)
Revised (Day Month Year)
Accepted (Day Month Year)
Abstract—Hadoop MapReduce framework is an important distributed processing model for large-scale data intensive applications. The current Hadoop and the existing HDFS’s rack-aware data placement strategy in MapReduce in the homogeneous Hadoop cluster assume that each node in a cluster has the same computing capacity and a same workload is assigned to each node. Default Hadoop doesn’t consider load state of each node in distribution input data blocks, which may cause inappropriate overhead and reduce Hadoop performance, but in practice, such data placement policy can noticeably reduce MapReduce performance and may increase extra energy dissipation in heterogeneous environments. This paper proposes a resource aware adaptive dynamic data placement algorithm (ADDP) .With ADDP algorithm, we can resolve the unbalanced node workload problem based on node load status. The proposed method can dynamically adapt and balance data stored on each node based on node load status in a heterogeneous Hadoop cluster. Experimental results show that data transfer overhead decreases in comparison with DDP and traditional Hadoop algorithms. Moreover, the proposed method can ecrease the execution time and improve the system’s throughput by increasing resource utilization.
Index Terms—Hadoop, MapReduce, Resource-aware, HDFS, Data placement, Heterogeneous
I. INTRODUCTION
I
n recent years, the World Wide Web has been adopted as a very useful platform for developing data-intensive applications, since the communication paradigm of the Web is sufficiently open and powerful. The search engine, webmail, data mining and social network services are currently indispensable data-intensive applications. These applications need data from a few gigabytes to several terabytes or even petabytes. Google, leverages the MapReduce model to process approximately twenty petabytes of data per day in a parallel programming model[1]. Hadoop MapReduce is an attractive model for parallel data processing in high-performance cluster computing environments. MapReduce model is primarily developed by Yahoo [2][site apache]. Hadoop is used by Yahoo servers, where hundreds of terabytes of data are generated on at least 10,000 cores[3]. Facebook uses Hadoop to process more than 15 terabytes of data per day. In addition to Yahoo and Facebook, Amazon and Last.fm are employing Hadoop to manage the massive huge amount of data [1].
The scalability of MapReduce is proven to be high because in the MapReduce programming model the job will be divided into a series of small tasks and run on multiple machines in a large-scale cluster[4]. MapReduce allows a programmer with no specific knowledge of distributed programming to create his/her MapReduce functions running in parallel across multiple nodes in the cluster. MapReduce automatically handles the gathering of results across the multiple nodes and return a single result or set of results to server[4]. More importantly, the MapReduce platform can offer fault tolerance. MapReduce model can automatically handle failures and it is fault tolerance mechanisms. When a node fails, MapReduce moves tasks that were run on the failed node to be rerun on another node.[5]
In the Hadoop architecture, data locality is one of the important factors affecting Hadoop applications performance. However, in a heterogeneous environment, the data required for performing a task is often nonlocal ,which affects the performance of Hadoop platform[4]. Data placement decision of Hadoop distributed file system (HDFS) is very important for the data locality which is a determining factor for the MapReduce performance and is a primary criterion for task scheduling of MapReduce model. The existing HDFS’s rack aware data placement strategy and replication scheme are work well with MapReduce framework in homogeneous Hadoop clusters[6], but in practice, such data placement policy can noticeably reduce heterogeneous environment performance and may cause increasingly The overhead of transferring unprocessed data from slow nodes to fast nodes [7]
II. Related work and motivation
This section introduces the Hadoop system and motivation for this study. Section2.1 describes the features of Hadoop. Section2.2describes
1. Hadoop
Hadoop is a successful and well-known implementation of the MapReduce model, which is open-source and supported by the Apache Software.
Hadoop consists of two main components: the MapReduce programming model and the Hadoop’s Distributed File System (HDFS [4], in which MapReduce is responsible for parallel processing and the HDFS is responsible for data management. In the Hadoop system, MapReduce management parallel process jobs and HDFS management data, respectively. Hadoop partitions a job to some tasks and HDFS partitions input data into blocks, and assigns them to every node in a cluster. Hadoop is based on distributed architecture it means HadoopMapreduce adopts master/slave architecture, in which a master node controls a group of slave nodes on which the Map and Reduce functions run in parallel. Slaves are nodes that process tasks that master assigns to them .In the MapReduce model, the master is called JobTracker, and each slave is called TaskTracker. In the HDFS, the master is called NameNode, and each slave is called DataNode. Master is responsible for distribution data blocks and assigning tasks slot to every node in Hadoop cluster. The default Hadoop assumes that the node computing capacity and storage capacity are the same in the cluster such a homogeneous environment, the data placement strategy of Hadoop can boost the efficiency of the MapReduce model but in a heterogeneous environment, such data placement has many problems.
2. MapReduce
MapReduce is a parallel programming model used in clusters that have numerous nodes and use computing resources to manage large amounts of data in parallel. MapReduce is proposed by Google in 2004. In the MapReduce model, an application should process is called a “job”. Hadoop divides the input of a MapReduce job into some pieces called “map tasks” and “reduce tasks”, in which the map-tasks run the map function and the reduce tasks run the reduce function. Map function processes input tasks and data assigned by the Master node and produce intermediate (key, value) pairs. Based on (key, value) pairs that are generated by map function processes, the reduce function then merges, sorts, and returns the result. The MapReduce model is based on “master/slave” concept. It distributes a large amount of input data to many processing nodes to perform parallel processing, which reduces the execution time and improves the performance. Input data are divided into many of the same sized data blocks; these blocks are then assigned to nodes that perform the same map function in parallel. After the map function is performed, the generated output is an intermediate several key, value pairs. The nodes that perform the reduce function obtain these intermediate data, and finally generate the output data[8] . The MapReduce model was conceived with the principle that “moving computation is much cheaper than moving data[5] .
3. HDFS
Hadoop Distributed File System (HDFS) is based on the Google File System. HDFS is used with the MapReduce model. It consists of a NameNode module in the MasterNode and many DataNodes modules in the slaveNodes. The NameNode is responsible for the management and storage of the entire file system and file information (such a namespace and metadata). NameNode is responsible for partition the input files that are written in HDFS into many data blocks. These blocks are the same size and the default size of them is 64 MB. HDFS allocates these data blocks to every DataNode. DataNodes are responsible for storing and processing these data blocks and send the result to NameNode. Hadoop is fault tolerance and makes 3 replicas of each data block for the files that are stored on HDFS. HDFS’s replica placement strategy is to put one replica of the block on one node in the local rack, another on a different node in the same rack, and the third on a node in some other rack. When failure happens to a node these replicas become very important and they should process instead of lost data blocks.
4. Background and motivation
The Hadoop default data placement strategy assumes that the computing capacity and storage capacity of each node in the cluster is the same .Each node is assigned to the same workload. The default Hadoop data placement strategy assumes that the node computing capacity and storage capacity are the same in the cluster such a homogeneous environment, the data placement strategy of Hadoop can boost the efficiency of the MapReduce model but in a heterogeneous environment, such data placement has many problems. In a heterogeneous environment, the difference in nodes computing capacity may cause load imbalance. The reason is that different computing capacities between nodes cause different task execution time so that the faster nodes finish processing local data blocks faster than slower nodes do. At this point, the master assigns non-performed tasks to the idle faster nodes, but these nodes do not own the data needed for processing .The required data should be transferred from slow nodes to idle faster nodes through the network. Because waiting for the data transmission time increases the task execution time, it causes the entire job execution time to become extended. A large number of moved data affects Hadoop performance. To improve the performance of Hadoop in heterogeneous clusters, this paper aims to minimize data movement between slow and fast nodes. This goal can be achieved by a data placement scheme that distributes and store data across multiple heterogeneous nodes based on their computing capacities. Data movement can be reduced if each node is assigned to the workload that is based on node’s data processing speed and node’s system load[4, 7].
Some task scheduling strategies have been proposed in Hadoop framework in recent years. Reference [9] proposed an Adaptive Task Scheduling Strategy Based on Dynamic Workload Adjustment called (ATSDWA). Each tasktracker collects its own load information and reports it to jobtracker periodically so tasktrackers can adapt to the change of load at runtime, obtain tasks in accordance with the computing abilities. Reference [4] proposed a data placement algorithm(DDP) which distribute input data blocks based on each node computing capacity in a heterogeneous Hadoop cluster. Reference[10]proposed a resource aware scheduling algorithm in which algorithm classify the type of work and node workload to I/O bound jobs and CPU-bound jobs. Each workload assigns to a group of nodes. Algorithm selects appropriate tasks to run according to the workload of the node. Reference[11] explored an extensional MapReduce task scheduling algorithm for deadline constraints (MTSD) for Hadoop platforms, which allows the user to specify a job’s deadline and makes it finished it before the deadline. Reference [6] proposed a novel data placement strategy (SLDP) for heterogeneous Hadoop clusters. That algorithm changes traditional Hadoop data block replication based on data hotness. SLDP adopts a heterogeneity-aware algorithm to divide various nodes into several virtual storage tiers firstly and then places data blocks across nodes in each storage tiers circuitously according to the hotness of data.
III. ADDP algorithm
1. Main Idea
Computing capacity of each node in the heterogeneous clusters is different, so load of each node changes dynamically. The ADDP algorithm is presented in this paper which uses the type and volume load of jobs for adjusting the distribution of input data block. The proposed algorithm consists of two main phases. In the first round, NameNode distributes data blocks based on each node computing capacity ratios in the Ratio table. In the next rounds, load parameters of each node (average Cpu utilization, average memory utilization) are monitored and stored in the “History table” of the node. Then NameNode calculates for each node an appropriate data block numbers which are more compatible with load status based on comparing each node load parameters with cluster load parameters in the Load-Distribution-Patterns table. This table has load volume formulas for each load state of a node and these formulas determine the new best workload that is more compatible with node load situation. The new workload for each node, which is more compatible with node load state, is stored in a Cluster-History table and will distribute to nodes in the next rounds.
Fig. 1. Shows how name node deploy data blocks on data nodes
In the algorithm, there are two tables: “Ratio table” and “Load-Distribution-Patterns table”. Ratio table is a table that stores computing capacity ratios of each node for different job types. Load-Distribution-Patterns table stores defined average Cpu utilization (AvgCpuUsage) and average memory utilization (AvgMemUsage) as load parameters of the whole cluster. Each node load parameters compare with load parameters of the cluster to determine the new work loud. In the cluster, we assume three main load states: the overloading state is defined as overload, the underloading state is defined as “underload” and normal loading state is defined as “normalload”. There are some sub load states for underload state based on cluster load situation. Based on these clusters, there are some rows in the Load-Distribution-Patterns table. For each row, there is volume load formula which shows how much workload should add to the current node workload to make it become more compatible with node load state so that the node use sresources more efficient. The load parameters of each node compare with the rows of the table. If a node load parameters belong to a row in the table, the formula related with that row calculates the new data load volume, which is more appropriate than current node load state. The percentage of added workload is shown by λ factor. Next node volume load average (VLAi+1) is equal to previous volume load average (VLAi) plus a percentage of the current load average.This percentage factor is different from one row to another and depends on node load state. The percentage factors are defined in definition lambda factor table.
Table 1
Load-Distribution-Patterns table.
Load volume formula | Average Cpu Usage | AverageMemory Usage | load state |
|
|
| Underload |
|
|
| Normal load |
|
|
| Overload |
Table 2
Definition- lambda -factor table
Lambda definition | Load State |
| Very Underload |
| Underload |
| Underload near to Normal |
| NormalLoad |
| Optimize-NormalLoad |
| Overload |
Each load volume formula in the Load-distribution-Pattern table tries to calculate workload which is more compatible than curreny node load situation so in general, we have six load level that will explain in the following.
If a node state is “Very underload”, lambda factor for it in the load volume formula is so node’s workload that should be assigned to it for the next round is at least 50% of node current workload plus current workload.
If a node state is” Underload”, lambda factor for it in the load volume formula is ‘so the node workload which should be assigned to it for the next round is at least 33% of node current workload plus current workload.
If a node state is” Underload near to NormalLoad”, lambda factor for it in the load volume formula is and node’s workload that should be assigned to it for the next round is at least 20% of node current workload plus current workload.
If a node state is” NormalLoad”, lambda factor for it in the load volume formula is ,because node’s load state is in the normal situation, most of the time there is no need to add workload to node current workload. However, sometimes cluster administrator can add some more workload to the node current workload to optimize node resource utilization. In this situation volume, the formula is and the percentage of this factor is based on administrator opinion. If a node state be” Overload”, lambda factor for it in the load volume formula is ,so node workload which should be assigned to it for the next round is at least 10% of node current workload minus current workload.
IV. Formulas
The formula 1 to 4 and 5 to 8 are used to make Ratio table, and Load-distribution-Pattern table, respectively.
(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
V. Formula Description
For mathematical formulas, let Tavg (i) denotes the average execution time to complete a batch of tasks in the node (I) and Tt(i) denotes the average time required to complete one task for the node (I) [4].
In order to get the real-time information of CpuUsage, we can use related parameters in the file /proc/stat of Linux system to calculate CpuUsage. Seven pieces of items can be extracted from file /proc/stat: user-mode time (user), low-priority user-mode time (nice), system mode time (sys), idle task-mode time (idle), hard disk I/O preparing time (iowait), hardware interrupting time (irq), and software interrupting time (softirq). File /proc/stat keeps track of a variety of different statistics about the system since it was restarted. The time unit is called “Jiffy” (1/100 of Figure axis labels are often a source of a second for×86 systems). Thus, CpuUsage is calculated with the difference of values between two sample points.
The memory utilization is defined as MemUsage. In order to get MemUsage in real time, it is needed to use the relevant parameters from file /proc/meminfo of Linux system to calculate MemUsage, which reflects the state of memory in real time. There are four pieces of useful items extracted from file /proc/meminfo: total memory size (MemTotal), free memory (MemFree), block-device buffers (Buffers), and file cache (Cached). MemUsage can be calculated by (8) [9].
The followings are algorithm ADDP workflow and pseudocode.
[2] -Department of Computer Engineering, Islamic Azad University South Tehran Branch (Avishan.Sharafi@gmail.com)
[3] - Department of Computer Engineering, Islamic Azad
University, Science and Research Branch (Alirezaee.uni@gmail.com)
Fig. 2. Workflow of ADDP
Algorithm 1: Adaptive Dynamic Data Placement Algorithm(ADDP) | ||||||||||
Find number of cluster’s node and number of each node’s core | 1. | |||||||||
Find Job Type in Ratio Table | 2. | |||||||||
IF Job Type Doesn’t exist in Ratio Table | 3. | |||||||||
Add Job Type and Job input Volume in Cluster-History Table and Job Type in Ratio Table |
| 3.1. | ||||||||
Distribute Test data and Test task on each cluster’s nodes |
| 3.2. | ||||||||
Make a record for Job in Ratio-Table (see Algorithm1) |
| 3.3. | ||||||||
Make Load- Distribution-Pattern Table (see Algorithm2) |
| 3.4. | ||||||||
for each node in the cluster |
| 4. | ||||||||
BlockNumber= Total BlockNumber*[ ] |
|
| 4.1. | |||||||
for each node in the cluster |
| 5. | ||||||||
Distribute Node calculated Data Block Numbers |
|
| 5.1. | |||||||
for each node in the cluster |
| 6. | ||||||||
|
| Calculate (AvgCpuUsage) = |
|
| 6.1 | |||||
|
| Calculate (AvgMemUsage) = |
|
| 6.2 | |||||
each node in the cluster | for |
| 7. | |||||||
Determine Node LoadState by comparing Node’s AvgCpuUsage and AvgMemUsage with Load- Distribution-Pattern Table’s AvgMemUsage and AvgCpuUsage
|
|
| 7.1 | |||||||
Calculate Node’s volume-load based on node’s LoadState by using Load- Distribution-Pattern Table’s formulas. |
|
| 7.2 | |||||||
Store node’s volume- load in Cluster-History Table |
|
| 7.3 | |||||||
All Node AvgCpuUsage and AvgMemUsage are Utilized based on Load-Distribution-Pattern Table | If |
| 8. | |||||||
Set the Utilize flag= True |
|
| 8.1 | |||||||
Store utilize flag in utilize field in the Cluster-History Table |
|
| 8.2 | |||||||
| Else |
| 9 | |||||||
Set the Utilize flag= False |
|
| 9.1 | |||||||
Store utilize flag in utilize field in the Cluster-History Table |
|
| 9.2 | |||||||
|
| Else | 10 | |||||||
Input data volume exists in Cluster-History Table | If |
| 11 | |||||||
Distribute input Data volume based on Block Numbers Result in Cluster-History Table |
|
| 11.1 | |||||||
Check utilize flag in utilize field in the Cluster-History Table |
|
| 11.2 | |||||||
Input data volume is utilized based on utilizing field in the Cluster-History Table | if |
|
| 12 | ||||||
print “Cluster is Utilized” and finish |
|
|
| 12.1 | ||||||
Go to 6 | else |
|
| 13 | ||||||
| Go to 4 | else |
| 14 | ||||||
|
Algorithm for making Ratio-Table:
Algorithm 2: Make Ratio-Table | ||
For each node | 1. | |
Distribute TestTask |
| 1.1 |
Calculate Node’s TotalExeTime(Ttotal) →
|
| 1.2 |
Calculate Node’s AverageExeTime(Tavg) →
|
| 1.3 |
Calculate Node’s ComputingCapacity (Tt) |
| 1.4 |
Calculate Node’s ComputingCapacityRatio(Rt) |
| 1.5 |
Fill Computing-Capacity-Ratio Table with (Rt) ratios | 2. | |
Add JobType in Computing-Capacity-Ratio Table (RatioTable) | 3. |
Algorithm for making Load-distribution-Pattern Table:
Algorithm 3: Make Load-Distribution-Pattern Table | ||
For each node in cluster | 1. | |
Calculate Node’s Average CpuUsage(AvgCpuUsage)
|
| 1.1. |
Calculate Node’s Average MemoryUsage(AvgMemUsage)
|
| 1.2. |
| 2. | |
| 3. | |
Fill Load-distribution-Pattern Table with LoadParameters | 4. |
When a new job submits to a cluster and there is no information of that job in the NameNode, NameNode distributes input data blocks based on values in Ratio table at the first round and for next rounds, the whole cluster will monitors by monitor module then every node workload that is compatible with its load state will calculate by Load-distribution-Pattern table. In General For every job that submits to cluster, there are three situations.
1. Scenario 1 (Statements 1 and 9.2 in the Algorithm 1):
When a job submits to a cluster and data are written into the HDFS, NameNode first checks the RatioTable. These data are used to determine whether this type of job has been performed. If there is no record for that job type in the RatioTable, It means this type of job is new and there isn’t any information of job type in the NameNode so for distributing input data block, NameNode needs to make records for job type in Ratio Table and for job type and its data volume in Cluster-History Table and the NameNode should make Load-Distribution-Pattern Table for that job type. After distributing input data based on information in Ratio Table monitoring phase will start.
2. Scenario 2 (Statements 10 and 14 in the Algorithm 1):
If the RatioTable has a record of this job, it means the type of job has been performed and there is a record for the job in the Cluster-History Table too, and there is Load-Distribution-Pattern Table for that job type. Then NameNode checks job input volume in the Cluster-History table. If the input volume for submitted job isn’t in the table , it means that there is no distribution pattern for input data in the Cluster-History table and the newly written data will be allocated to each node in accordance with the computing capacity which is recorded in the Ratio Table. After assigning input data block, NameNode monitors each node load state and compare these states with values in Load-distribution-Pattern table until the workload that is more compatible with node load situation was calculated by load formula in the Load-distribution-Pattern table. This new workload is stored for each node in the Cluster- History table and will be distributed to nodes in next round of that job with same data input.
3. Scenario 3 (Statements 11 and 13 in the Algorithm 1):
If there are records for job type and its load volume input data in the Ratio table and Cluster-History table, it means that NameNode has all information for distributing input data blocks to each node. NomeNode distributes input data blocks based on information stored in Cluster-History table. Then NameNode monitors each node load state and compares these states with Load-distribution-Pattern table, so the workload which is more compatible with current node load situation is calculated by load formula in the Load-distribution-Pattern table and is stored for each node in the Cluster- History table and will be distributed to nodes in next round of submitting a job with the same type and data input. If all nodes in the cluster are in the normal load situation, the utilize field for that job with its input load volume in Cluster- History table will set “T” ;otherwise, will set “F”. These histories in Cluster-History table will help the NameNode if a job with the same workload will submit to the cluster. It means the Namenode doesn’t need any calculation for distribution input data volume, because all information for distributing input data blocks that make cluster in normal load situation, is stored in the Cluster-History table.
VI. Experimental result
This section presents the experimental environment and the experimental results for the proposed algorithm.
Table 3
Each node specification
Machine | Operating system | Memory (GB) | Number ofCores | Disk(GB) |
Master | Windows7 | 6 | 4 | 930 |
Slave1 | Ubuntu Linux15.0 | 2 | 1 | 19.8 |
Slave2 | Ubuntu Linux15.0 | 3 | 2 | 19.8 |
Slave3 | Ubuntu Linux15.0 | 6 | 4 | 583.4 |
We designed a TestBed for test and compared presented algorithm with DDP algorithm and Hadoop framework. WordCount is a type of job runs to evaluate the performance of the proposed algorithm in a Hadoop heterogeneous cluster. WordCount is a MapReduce application running on a Hadoop cluster and is an application used for counting the words in the input file.
The experimental environment is shown in the Table. 3. We use Intel Core i5-4210U 1.70GHZ for salve1 and Intel Core i5-4210U 1.70GHZ for salve2 and Intel Core i7-4790 3.60GHZ for salve3.We use VirtualBox 4.1.14 to create our computing node for slave1 and salve2. In order to achieve the effect of a heterogeneous environment, the capacity of the nodes is not the same. We set different amounts of CPU and memory on each node. In total, we create the four machines: one master and three slaves. One machine as the master has 4 CPUs, 6 GB of memory, and 930 GB disk; one virtual machine as a slave1 has 1 CPU, 2 GB of memory, and a 19 GB disk; one virtual machine as a slave2 has 2 CPUs, 3GB of memory, and a 19 GB disk; one machine as a slave3 has 4CPUs, 6GB of memory, and a 538 GB disk.
Table 3 presents the specifications of each node. All of the slave machines adopt the operating system as Ubuntu 15.0 LTS, and the master machine adopts the operating system as windows 7.
Table 4
Ratio table
Job Type | Slave1 | Slave1 | Slave1 |
WordCount | 1 | 2 | 4 |
Table 5
RatioTable example.
Job Type | Input Data | Slave1 | Slave2 | Slave3 |
WordCount
| α |
|
|
|
Parametric Each node workload | β | 2 β | 4 β | |
350 MB | 50 MB | 100 MB | 200 MB |
Table 4 shows ratios for WordCount job in RatioTable. Table 5 is made by ratios in RatioTable and shows if input data block is 350 MB, slave1 is assigned 50 MB, slave2 is assigned 100 MB and slave3 is assigned 200 MB. In proposed algorithm, the number of tasks which is run on each node is based on node core numbers. Slave1 has one core, so slave1 just runs 1task in each round .Slave2 has two cores, so it runs 2 tasks in each round simultaneously. Slave3 has four cores, so it runs 4 tasks in each round simultaneously. Each job processes different input data respectively, in which the size of input data for slave 1 is 50 MB, for slave 2 is 100 MB and for slave 3 is 350 MB.
Fig. 4 to Fig. 11 compares the process of DDP algorithm with ADDP algorithm in dealing with the overload state which happens in slave 2.
Slave2 in a cluster in Fig.4.is overloaded because based on Fig. 3 the execution time for each slave are 33 and 83 and 53 seconds, respectively. When execution time of slave2 takes 240 seconds it means slave2 is overloaded that can’t finish its job in time. In DDP algorithm data will be allocated to each node in accordance with the computing capacity that records in the RatioTable and DDP never monitor load state of nodes and never consider load state of nodes in assigning data block to them so DDP algorithm doesn’t work well in overload state and underload state because it assigns data block to nodes based on computing capacity which is accordance to hardware so in every round DDP algorithm distribute data block just based on computing capacity and 100MB data assigns to slave2 which at the moment node is overload and can’t process assigned blocks, in time.
Fig. 3. .Execution time of each slave in normal load state
Fig. 12 shows cluster in overload states in the Hadoop Hadoop-1.2.1. framework. We use overload state in Hadoop for comparing Hadoop framework with DDP and ADDP algorithms in an overload state.
Fig. 4.Execution time of each slave in DDP overload state (Round(i))
Fig. 6.Execution time of each slave in DDP overload state (Round(i+1))
Fig. 8.Execution time of each slave in DDP overload state (Round(i+2))
Fig. 5.Execution time of each slave in ADDP overload state(Round(i))
Fig. 7.Execution time of each slave in ADDP overload state (Round(i+1))
Fig. 9.Execution time of each slave in ADDP overload state (Round(i+2))
Fig. 10.Execution time of each slave in DDP overload state (Round(i+3)) |
Fig. 11.Execution time of each slave in ADDP overload state (Round(i+3) |
ADDP algorithm in the first round (round (i)) distributes data block based on computing capacity ratios then NameNode monitors each node load state and compares these states with Load-distribution-Pattern table until the workload which is more compatible with node load state is calculated by load formulas in the Load-distribution-Pattern table and these workloads will be stored for that node in the Cluster- History table and will be distributed to node in the next rounds of that job with same data input. DDP algorithm in the first round (round (i)) distribute data block based on computing capacity. In Round2 (round (i+1)) DDP algorithm distributes data block based on computing capacity, but ADDP algorithm distributes data blocks based on values which is stored in Cluster-History table. NameNode assigns data blocks based on this values which are calculated by Load-Distribution-Patterns table formulas. Because slave2 is overloaded, 10% of slave2 workload is added to salve3 workload and in round2 every nodes workload becomes 50MB, 90MB, 210MB for slave1, slave2 and slave3, respectively and execution time becomes 33 seconds, 190 seconds and 61 seconds respectively. 190 seconds for slave2 is still too much, so in round3 (round(i+2)) based on Load-Distribution-Patterns table formulas 10% of slave2 workload is added to salve3 workload and in round3 every nodes workload becomes 50MB, 81MB, 219MB for slave1, slave2 and slave3 respectively and execution time becomes 33 seconds 141 seconds and 73 seconds, respectively. 141 seconds for slave2 is still too much, so in round4 (round (i+3)) based on Load-Distribution-Patterns table formulas 10% of slave2 workload is added to salve3 workload and in round 4 each node workload becomes 50MB, 73MB, 227MB for slave1, slave2 and slave3, respectively and execution time becomes 33 seconds, 91 seconds and 80 seconds, respectively.
Fig. 12.Execution time of each slave in overload state in Hadoop
Fig. 13. Execution time of the whole cluster in each round in overload state in the Hadoop , DDP and ADDP algorithms
In round4 slave2 which was in overloaded state has a load state which can handle it well and all nodes in cluster with 350 MB input data become load balance and in each round in ADDP algorithm average execution time of all cluster decreases and in round4 average execution time of the whole cluster is 68 seconds and it shows the cluster is balanced. In all rounds in DDP algorithm average execution time of the whole cluster is 108.66 seconds, because in DDP algorithm data which is allocated to each node is in accordance with he computing capacity which is recorded in the RatioTable and DDP never monitors load state of nodes and never considers load state of nodes in distributing input data blocks, so when each node be in overload state DDP never finds it and never changse distribution way of data block adaptively. Fig. 13 shows execution time of the whole cluster in the Hadoop framework and ADDP and DDP algorithms when slave2 is overload, in which ADDP cluster execution time decreases in each round and ADDP can make the corresponding adjustment to achieve the optimal state and realize self-regulation on the other hand Hadoop framework and DDP algorithm can’t understand overloading state in nodes and can’t handle underload and overload state in the cluster.
VII. Conclusion
This paper proposes adaptive dynamic data placement algorithm (ADDP) for map tasks of data locality to allocate data blocks. This algorithm is in the resource aware scheduling algorithms classification .The default Hadoop assumes that every node in a cluster has the same computing capacity and each node is assigned to the same workload. This data placement algorithm is assumed to be applied in homogeneous clusters. In a heterogeneous environment, the difference in nodes computing capacity may cause load imbalance and creates the necessity to spend additional overhead to transfer unprocessed data from slow nodes to fast nodes. To improve the performance of Hadoop in heterogeneous clusters, we aim to minimize data movement between slow and fast nodes. This goal can be achieved by a data placement scheme that distributes and stores data across multiple heterogeneous nodes based on their computing capacities and workload. The proposed ADDP algorithm mechanism distributes fragments of an input file to heterogeneous nodes based on their computing capacities, and then calculates each node appropriate workload base on load parameters of each node to allocate data blocks, thereby improving data locality and reducing the additional overhead to enhance Hadoop performance. Our approach improves the performance of Hadoop heterogeneous clusters. ADDP significantly benefits both DataNodes and NameNode. On the DataNodes’ side, task execution time is reduced, and resource utilization is increased and node performance is more stable.
References
1. Turkington, G., Hadoop Beginner's Guide. 2013: Packt Publishing Ltd.
2. Holmes, A., Hadoop in practice. 2012: Manning Publications Co.
3. Schneider, R.D., Hadoop for Dummies Special Edition. John Wiley&Sons Canada, 2012.
4. Lee, C.-W., et al., A dynamic data placement strategy for hadoop in heterogeneous environments. Big Data Research, 2014. 1: p. 14-22.
5. Hadoop, A., Welcome to apache hadoop. Hämtat från http://hadoop. apache. org, 2014.
6. Xiong, R., J. Luo, and F. Dong, Optimizing data placement in heterogeneous Hadoop clusters. Cluster Computing, 2015. 18(4): p. 1465-1480.
7. Xie, J., et al. Improving mapreduce performance through data placement in heterogeneous hadoop clusters. in Parallel & Distributed Processing, Workshops and Ph.D. Forum (IPDPSW), 2010 IEEE International Symposium on. 2010. IEEE.
8. Singh, K. and R. Kaur. Hadoop: addressing challenges of big data. in Advance Computing Conference (IACC), 2014 IEEE International. 2014. IEEE.
9. Xu, X., L. Cao, and X. Wang, Adaptive task scheduling strategy based on dynamic workload adjustment for heterogeneous Hadoop clusters. 2014.
10. Xu, P., H. Wang, and M. Tian, New Scheduling Algorithm in Hadoop Based on Resource Aware, in Practical Applications of Intelligent Systems. 2014, Springer. p. 1011-1020.
11. Tang, Z., et al. MTSD: A task scheduling algorithm for MapReduce base on deadline constraints. in Parallel and Distributed Processing Symposium Workshops & Ph.D. Forum (IPDPSW), 2012 IEEE 26th International. 2012. IEEE.
Avishan Sharafi received the B.E. degree in computer software engineering and the M.S. degree in computer software engineering from Islamic Azad University Central Tehran Branch and from Islamic Azad University South Tehran Branch, IRAN, in 2013 and 2016, respectively.
Ali Rezaee received the Ph.D. degree in software architectures from Islamic Azad University Tehran Science and Research Branch.