Tuesday, August 19, 2014

HBase write throughput

Hbase_write_throughput

HBase write throughput as a function of number of column qualifiers

In Hbase, every cell value is stored along with all its cardinalities as follows,

rowkey:columnfamily:columnqualifier:timestamp:value

Hypothetically, let us assume the following

Data payload size               = 10 kb
rowkey size                     = 64 kb
columnfamily:columnname size    = 60 kb

In order to write a row with say 2 columns, the total amount of bytes transferred and written will be

2 * ( 5kb + 64 kb + 60 kb) = 258 kb (Total 10kb of payload split between two columns)

In order to write a row with say 1 column, the total will be

1 * (10 + 64 + 60) = 134 kb.

Larger the size, more data transfer across network, memstore will get full more often and hence will need more flush. This will negatively impact write throughput.

Verfiying this behaviour using HBase Load Testing tool,

Summary

- Rows          :   10k             10k             10K
- Columns       :   2               5               10
- PayLoad       :   512 kb          200 kb          100 kb
- Total PayLoad :   ~1000 kb        1000 kb         1000 kb     
- Throughput    :   405 Keys/s      252 keys/s      175 keys/s

Details

We start with 10000 rows, 2 columns with a payload of 512kb for every cell, indicated by - write 2:512:20

$ hbase org.apache.hadoop.hbase.util.LoadTestTool -write 2:512:20 -num_keys 10000
14/08/19 11:47:26 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Key range: [0..9999]
Multi-puts: false
Columns per key: 1..4
Data size per column: 256..768

Below is a log captured at 5 seconds interval, at the end of 20 seconds, we see that write throughput is 405 keys/s

Starting to write data...
14/08/19 11:47:39 INFO util.MultiThreadedAction: [W:20] Keys=1663, cols=5.6 K, time=00:00:05 Overall: [keys/s= 332, latency=58 ms] Current: [keys/s=332, latency=58 ms], wroteUpTo=-1
14/08/19 11:47:44 INFO util.MultiThreadedAction: [W:20] Keys=3641, cols=12.3 K, time=00:00:10 Overall: [keys/s= 361, latency=54 ms] Current: [keys/s=395, latency=51 ms], wroteUpTo=-1
14/08/19 11:47:49 INFO util.MultiThreadedAction: [W:20] Keys=5769, cols=19.5 K, time=00:00:15 Overall: [keys/s= 382, latency=51 ms] Current: [keys/s=425, latency=46 ms], wroteUpTo=-1
14/08/19 11:47:54 INFO util.MultiThreadedAction: [W:20] Keys=8128, cols=27.6 K, time=00:00:20 Overall: [keys/s= 405, latency=49 ms] Current: [keys/s=471, latency=42 ms], wroteUpTo=-1
Failed to write keys: 0

We do it again with 5 columns, 200kb payload and 10k rows

$ hbase org.apache.hadoop.hbase.util.LoadTestTool -write 5:200:20 -num_keys 10000
14/08/19 14:38:20 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Key range: [0..9999]
Multi-puts: false
Columns per key: 1..10
Data size per column: 100..300
.
.
Starting to write data...
14/08/19 14:38:31 INFO util.MultiThreadedAction: [W:20] Keys=901, cols=5.6 K, time=00:00:05 Overall: [keys/s= 180, latency=106 ms] Current: [keys/s=180, latency=106 ms], wroteUpTo=-1
14/08/19 14:38:36 INFO util.MultiThreadedAction: [W:20] Keys=1979, cols=12.4 K, time=00:00:10 Overall: [keys/s= 197, latency=99 ms] Current: [keys/s=215, latency=92 ms], wroteUpTo=-1
14/08/19 14:38:41 INFO util.MultiThreadedAction: [W:20] Keys=3070, cols=19.3 K, time=00:00:15 Overall: [keys/s= 204, latency=96 ms] Current: [keys/s=218, latency=91 ms], wroteUpTo=-1
14/08/19 14:38:46 INFO util.MultiThreadedAction: [W:20] Keys=4367, cols=27.7 K, time=00:00:20 Overall: [keys/s= 218, latency=90 ms] Current: [keys/s=259, latency=77 ms], wroteUpTo=-1
14/08/19 14:38:51 INFO util.MultiThreadedAction: [W:20] Keys=5857, cols=36.9 K, time=00:00:25 Overall: [keys/s= 234, latency=84 ms] Current: [keys/s=298, latency=66 ms], wroteUpTo=-1
14/08/19 14:38:56 INFO util.MultiThreadedAction: [W:20] Keys=7373, cols=46.4 K, time=00:00:30 Overall: [keys/s= 245, latency=80 ms] Current: [keys/s=303, latency=65 ms], wroteUpTo=-1
14/08/19 14:39:01 INFO util.MultiThreadedAction: [W:20] Keys=8843, cols=55.7 K, time=00:00:35 Overall: [keys/s= 252, latency=78 ms] Current: [keys/s=294, latency=67 ms], wroteUpTo=-1
Failed to write keys: 0

As seen above, the write throughput has reduced to 252 keys/s.

Further increasing the number of columns to 10, with 100K payload, the write throughput is reduced to 175 keys/s

$ hbase org.apache.hadoop.hbase.util.LoadTestTool -write 10:100:20 -num_keys 10000
14/08/19 14:34:54 INFO Configuration.deprecation: hadoop.native.lib is deprecated. Instead, use io.native.lib.available
Key range: [0..9999]
Multi-puts: false
Columns per key: 1..20
Data size per column: 50..150

Starting to write data...
14/08/19 14:35:07 INFO util.MultiThreadedAction: [W:20] Keys=582, cols=6.4 K, time=00:00:05 Overall: [keys/s= 116, latency=168 ms] Current: [keys/s=116, latency=168 ms], wroteUpTo=-1
14/08/19 14:35:12 INFO util.MultiThreadedAction: [W:20] Keys=1157, cols=13.0 K, time=00:00:10 Overall: [keys/s= 115, latency=171 ms] Current: [keys/s=115, latency=173 ms], wroteUpTo=-1
14/08/19 14:35:17 INFO util.MultiThreadedAction: [W:20] Keys=1884, cols=21.0 K, time=00:00:15 Overall: [keys/s= 125, latency=158 ms] Current: [keys/s=145, latency=137 ms], wroteUpTo=-1
14/08/19 14:35:22 INFO util.MultiThreadedAction: [W:20] Keys=2687, cols=30.0 K, time=00:00:20 Overall: [keys/s= 134, latency=147 ms] Current: [keys/s=160, latency=123 ms], wroteUpTo=-1
14/08/19 14:35:27 INFO util.MultiThreadedAction: [W:20] Keys=3558, cols=39.8 K, time=00:00:25 Overall: [keys/s= 142, latency=139 ms] Current: [keys/s=174, latency=115 ms], wroteUpTo=-1
14/08/19 14:35:32 INFO util.MultiThreadedAction: [W:20] Keys=4513, cols=50.5 K, time=00:00:30 Overall: [keys/s= 150, latency=132 ms] Current: [keys/s=191, latency=104 ms], wroteUpTo=-1
14/08/19 14:35:37 INFO util.MultiThreadedAction: [W:20] Keys=5410, cols=60.5 K, time=00:00:35 Overall: [keys/s= 154, latency=128 ms] Current: [keys/s=179, latency=111 ms], wroteUpTo=-1
14/08/19 14:35:42 INFO util.MultiThreadedAction: [W:20] Keys=6322, cols=70.8 K, time=00:00:40 Overall: [keys/s= 157, latency=126 ms] Current: [keys/s=182, latency=109 ms], wroteUpTo=-1
14/08/19 14:35:47 INFO util.MultiThreadedAction: [W:20] Keys=7280, cols=81.8 K, time=00:00:45 Overall: [keys/s= 161, latency=123 ms] Current: [keys/s=191, latency=104 ms], wroteUpTo=-1
14/08/19 14:35:52 INFO util.MultiThreadedAction: [W:20] Keys=8496, cols=95.6 K, time=00:00:50 Overall: [keys/s= 169, latency=117 ms] Current: [keys/s=243, latency=82 ms], wroteUpTo=-1
14/08/19 14:35:57 INFO util.MultiThreadedAction: [W:20] Keys=9632, cols=108.1 K, time=00:00:55 Overall: [keys/s= 175, latency=113 ms] Current: [keys/s=227, latency=87 ms], wroteUpTo=-1
Failed to write keys: 0

Monday, August 11, 2014

Unsupervised Feature Learning - Sparse Filtering

SparseFiltering

Sparse Filtering

An unsupervised feature learning method. Click here for the paper.
The code is available in mygithub
A small python implementation of the Sparse Filtering algorithm. This implementation has dependency on
As discussed in the paper we use a soft absolute activation function.
def soft_absolute(v):
    return np.sqrt(v**2 + epsilon)
The input X dimension is (nsamples,ndimensions). For testing purpose, we use scikit learn's make_classification function. We create 500 samples and 100 features.
def load_data():
    X,Y = make_classification(n_samples = 500,n_features=100)
    return X,Y
We use a simple Support Vector Machine Linear classifier to do final classiciation.
def simple_model(X,Y):
    clf_org_x = SVC()
    clf_org_x.fit(X,Y)
    predict = clf_org_x.predict(X)
    acc=  accuracy_score(Y,predict)
    return acc
We train a two layer network.
X,Y = load_data()
acc = simple_model(X,Y)

X_trans = sfiltering(X,25)

acc1= simple_model(X_trans,Y)

X_trans1 = sfiltering(X_trans,10)

acc2= simple_model(X_trans1,Y)

print "Without sparsefiltering, accuracy = %f "%(acc)
print "One Layer Accuracy, = %f, Increase = %f"%(acc1,acc1-acc)
print "Two Layer Accuracy,  = %f, Increase = %f"%(acc2,acc2-acc1)
At the first layer, we create 25 features. At the second layer we reduce them to 10. Finally a (500,10) X matrix is used by the SVC classifier.
Without sparsefiltering, accuracy = 0.986000 
One Layer Accuracy, = 1.000000, Increase = 0.014000
Two Layer Accuracy,  = 1.000000, Increase = 0.000000
With a single layer sparse filtering the accuracy reaches 100%. The second layer is redundant here.
Other implementations available in the web are,

Saturday, August 2, 2014

Unsupervised key phrase extraction from document - RAKE

RAKE

RAKE is an extremely effiecient keyword extraction algorithm and operates on individual documents. Its language and domain independent.

Literature is abundant with methods which uses Noun phrase chunks,POS tags, ngram statistics and similar others.

Given a document, stop word list and a list of phrase delimiters, RAKE extracts candidate phrases.

phrases = []

for line in sentences:
    words = nltk.word_tokenize(line)
    phrase = ''
    for word in words:
        if word not in stopwords.words('english') and word not in [',','.','?',':',';']:
            phrase+=word + ' '
        else:
            if phrase != '':
            phrases.append(phrase.strip())
            phrase = ''

We takes the stop word list from nltk and use a list of special characters as word delimiters.Every sentence is split into chunks at stop words occurence and phrase delimiters occurence.

The next step is to find out the frequency of the individual words in these phrases. Frequency is the count of occurence of the word.

word_freq = defaultdict(int)
word_degree = defaultdict(int)
word_score = defaultdict(float)

for phrase in phrases:
    words = phrase.split(' ')
    phrase_length = len(words)
    for word in words:
        word_freq[word]+=1
        word_degree[word]+=phrase_length

Degree of a word is sum of length of all the phrases where the word occurs. Finally scoring for each word is done by

score(word) = degree(word) / freq(word).

for word,freq in word_freq.items():
    degree = word_degree[word]
    score = ( 1.0 * degree ) / (1.0 * freq )
    word_score[word] = score

Phrase scores are calcuated by sum of individual word scores in that phrase. Phrase with very large scores are considered to be keyphrases for the document.

RAKE algorithm is explained in the book Text Mining:Application and Theory

There are several python implementations available.

The current code is available in github