Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

Tuesday, August 19, 2014

HBase write throughput

Hbase_write_throughput

HBase write throughput as a function of number of column qualifiers

In Hbase, every cell value is stored along with all its cardinalities as follows,

rowkey:columnfamily:columnqualifier:timestamp:value

Hypothetically, let us assume the following

Data payload size               = 10 kb
rowkey size                     = 64 kb
columnfamily:columnname size    = 60 kb

In order to write a row with say 2 columns, the total amount of bytes transferred and written will be

2 * ( 5kb + 64 kb + 60 kb) = 258 kb (Total 10kb of payload split between two columns)

In order to write a row with say 1 column, the total will be

1 * (10 + 64 + 60) = 134 kb.

Larger the size, more data transfer across network, memstore will get full more often and hence will need more flush. This will negatively impact write throughput.

Verfiying this behaviour using HBase Load Testing tool,

Summary

- Rows          :   10k             10k             10K
- Columns       :   2               5               10
- PayLoad       :   512 kb          200 kb          100 kb
- Total PayLoad :   ~1000 kb        1000 kb         1000 kb     
- Throughput    :   405 Keys/s      252 keys/s      175 keys/s

Details

We start with 10000 rows, 2 columns with a payload of 512kb for every cell, indicated by - write 2:512:20

$ hbase org.apache.hadoop.hbase.util.LoadTestTool -write 2:512:20 -num_keys 10000
14/08/19 11:47:26 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Key range: [0..9999]
Multi-puts: false
Columns per key: 1..4
Data size per column: 256..768

Below is a log captured at 5 seconds interval, at the end of 20 seconds, we see that write throughput is 405 keys/s

Starting to write data...
14/08/19 11:47:39 INFO util.MultiThreadedAction: [W:20] Keys=1663, cols=5.6 K, time=00:00:05 Overall: [keys/s= 332, latency=58 ms] Current: [keys/s=332, latency=58 ms], wroteUpTo=-1
14/08/19 11:47:44 INFO util.MultiThreadedAction: [W:20] Keys=3641, cols=12.3 K, time=00:00:10 Overall: [keys/s= 361, latency=54 ms] Current: [keys/s=395, latency=51 ms], wroteUpTo=-1
14/08/19 11:47:49 INFO util.MultiThreadedAction: [W:20] Keys=5769, cols=19.5 K, time=00:00:15 Overall: [keys/s= 382, latency=51 ms] Current: [keys/s=425, latency=46 ms], wroteUpTo=-1
14/08/19 11:47:54 INFO util.MultiThreadedAction: [W:20] Keys=8128, cols=27.6 K, time=00:00:20 Overall: [keys/s= 405, latency=49 ms] Current: [keys/s=471, latency=42 ms], wroteUpTo=-1
Failed to write keys: 0

We do it again with 5 columns, 200kb payload and 10k rows

$ hbase org.apache.hadoop.hbase.util.LoadTestTool -write 5:200:20 -num_keys 10000
14/08/19 14:38:20 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Key range: [0..9999]
Multi-puts: false
Columns per key: 1..10
Data size per column: 100..300
.
.
Starting to write data...
14/08/19 14:38:31 INFO util.MultiThreadedAction: [W:20] Keys=901, cols=5.6 K, time=00:00:05 Overall: [keys/s= 180, latency=106 ms] Current: [keys/s=180, latency=106 ms], wroteUpTo=-1
14/08/19 14:38:36 INFO util.MultiThreadedAction: [W:20] Keys=1979, cols=12.4 K, time=00:00:10 Overall: [keys/s= 197, latency=99 ms] Current: [keys/s=215, latency=92 ms], wroteUpTo=-1
14/08/19 14:38:41 INFO util.MultiThreadedAction: [W:20] Keys=3070, cols=19.3 K, time=00:00:15 Overall: [keys/s= 204, latency=96 ms] Current: [keys/s=218, latency=91 ms], wroteUpTo=-1
14/08/19 14:38:46 INFO util.MultiThreadedAction: [W:20] Keys=4367, cols=27.7 K, time=00:00:20 Overall: [keys/s= 218, latency=90 ms] Current: [keys/s=259, latency=77 ms], wroteUpTo=-1
14/08/19 14:38:51 INFO util.MultiThreadedAction: [W:20] Keys=5857, cols=36.9 K, time=00:00:25 Overall: [keys/s= 234, latency=84 ms] Current: [keys/s=298, latency=66 ms], wroteUpTo=-1
14/08/19 14:38:56 INFO util.MultiThreadedAction: [W:20] Keys=7373, cols=46.4 K, time=00:00:30 Overall: [keys/s= 245, latency=80 ms] Current: [keys/s=303, latency=65 ms], wroteUpTo=-1
14/08/19 14:39:01 INFO util.MultiThreadedAction: [W:20] Keys=8843, cols=55.7 K, time=00:00:35 Overall: [keys/s= 252, latency=78 ms] Current: [keys/s=294, latency=67 ms], wroteUpTo=-1
Failed to write keys: 0

As seen above, the write throughput has reduced to 252 keys/s.

Further increasing the number of columns to 10, with 100K payload, the write throughput is reduced to 175 keys/s

$ hbase org.apache.hadoop.hbase.util.LoadTestTool -write 10:100:20 -num_keys 10000
14/08/19 14:34:54 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Key range: [0..9999]
Multi-puts: false
Columns per key: 1..20
Data size per column: 50..150

Starting to write data...
14/08/19 14:35:07 INFO util.MultiThreadedAction: [W:20] Keys=582, cols=6.4 K, time=00:00:05 Overall: [keys/s= 116, latency=168 ms] Current: [keys/s=116, latency=168 ms], wroteUpTo=-1
14/08/19 14:35:12 INFO util.MultiThreadedAction: [W:20] Keys=1157, cols=13.0 K, time=00:00:10 Overall: [keys/s= 115, latency=171 ms] Current: [keys/s=115, latency=173 ms], wroteUpTo=-1
14/08/19 14:35:17 INFO util.MultiThreadedAction: [W:20] Keys=1884, cols=21.0 K, time=00:00:15 Overall: [keys/s= 125, latency=158 ms] Current: [keys/s=145, latency=137 ms], wroteUpTo=-1
14/08/19 14:35:22 INFO util.MultiThreadedAction: [W:20] Keys=2687, cols=30.0 K, time=00:00:20 Overall: [keys/s= 134, latency=147 ms] Current: [keys/s=160, latency=123 ms], wroteUpTo=-1
14/08/19 14:35:27 INFO util.MultiThreadedAction: [W:20] Keys=3558, cols=39.8 K, time=00:00:25 Overall: [keys/s= 142, latency=139 ms] Current: [keys/s=174, latency=115 ms], wroteUpTo=-1
14/08/19 14:35:32 INFO util.MultiThreadedAction: [W:20] Keys=4513, cols=50.5 K, time=00:00:30 Overall: [keys/s= 150, latency=132 ms] Current: [keys/s=191, latency=104 ms], wroteUpTo=-1
14/08/19 14:35:37 INFO util.MultiThreadedAction: [W:20] Keys=5410, cols=60.5 K, time=00:00:35 Overall: [keys/s= 154, latency=128 ms] Current: [keys/s=179, latency=111 ms], wroteUpTo=-1
14/08/19 14:35:42 INFO util.MultiThreadedAction: [W:20] Keys=6322, cols=70.8 K, time=00:00:40 Overall: [keys/s= 157, latency=126 ms] Current: [keys/s=182, latency=109 ms], wroteUpTo=-1
14/08/19 14:35:47 INFO util.MultiThreadedAction: [W:20] Keys=7280, cols=81.8 K, time=00:00:45 Overall: [keys/s= 161, latency=123 ms] Current: [keys/s=191, latency=104 ms], wroteUpTo=-1
14/08/19 14:35:52 INFO util.MultiThreadedAction: [W:20] Keys=8496, cols=95.6 K, time=00:00:50 Overall: [keys/s= 169, latency=117 ms] Current: [keys/s=243, latency=82 ms], wroteUpTo=-1
14/08/19 14:35:57 INFO util.MultiThreadedAction: [W:20] Keys=9632, cols=108.1 K, time=00:00:55 Overall: [keys/s= 175, latency=113 ms] Current: [keys/s=227, latency=87 ms], wroteUpTo=-1
Failed to write keys: 0

Thursday, March 6, 2014

Unsupervised outlier detection for categorical variable

In this article, I will discuss about an outlier detection technique called Attribute Value Frequency (AVF) for categorical data. This is a part of series of blogs I intend to write on outlier detection methods. AVF method is attributed to Anna Koufakau (Scalable and Efficient Outlier Detection in Large Distributed Data Sets with Mixed-Type Attributes).

The intuition behind AVF; outliers tend to occur infrequently in a dataset. Tuples are scored based on frequency of individual attribute values. Tuples with low AVF scores are marked as outliers.
Assume X = {x1,x2,x3…..xn} is the given dataset having n tuples ,each tuple with m attributes,
 xi = {xi1,xi2…..xim} where  1 <= i <= n.



The function in the formula is frequency count of attribute j’s value. If a tuple has rare values for each of its attribute, due to summation, its overall score will be low. Thus at the end of the exercise, by arranging the tuples in increasing order of AVF score, the top K tuples are identified as outliers.
Given the summation nature of the calculation, the algorithm can be coded up using map reduce paradigm. The pseudo-code below implements AVF using map- reduce constructs.

Map (LineNumber,Row)
                For each col in Row
                                Emit ( colNumber + colValue,1)

Reduce ( key (colNumber + colValue ),  List )
                Total = sum ( all list values)
                Emit(key,Total)

The output of Reduce is used in the next map reduce. Assume it’s a hashmap named as CountDict.

Map (LineNumber,Row)
                Sum = 0
For each col in Row
                Key = colNumber + colValue
                Count = CountDict(Key)
                                Sum = sum + count
                Emit(Sum,Row)
Reduce (Key (Sum),List)
                For row in List
                                Emit (Key, row)

References


  1.  Scalable and Efficient Outlier Detection in Large Distributed Data Sets with Mixed-Type Attributes