Practical illustration of Map-Reduce (Hadoop-style), on real data

Here I will discuss a general framework to process web traffic data. The concept of Map-Reduce will be naturally introduced. Let's say you want to design a system to score Internet clicks, to measure the chance for a click to convert, or the chance to be fraudulent or un-billable. The data comes from a publisher or ad network; it could be Google. Conversion data is limited and poor (some conversions are tracked, some are not; some conversions are soft, just a click-out, and conversion rate is above 10%; some conversions are hard, for instance a credit card purchase, and conversion rate is below 1%). Here, for now, we just ignore the conversion data and focus on the low hanging fruits: click data. Other valuable data is impression data (for instance a click not associated with an impression is very suspicious). But impression data is huge, 20 times bigger than click data. We ignore impression data here.

Here, we work with complete click data collected over a 7-day time period. Let's assume that we have 50 million clicks in the data set. Working with a sample is risky, because much of the fraud is spread across a large number of affiliates, and involve clusters (small and large) of affiliates, and tons of IP addresses but few clicks per IP per day (low frequency).

The data set (ideally, a tab-separated text file, as CSV files can cause field misalignment here due to text values containing field separators) contains 60 fields: keyword (user query or advertiser keyword blended together, argh...), referral (actual referral domain or ad exchange domain, blended together, argh...), user agent (UA, a long string; UA is also known as browser, but it can be a bot), affiliate ID, partner ID (a partner has multiple affiliates), IP address, time, city and a bunch of other parameters.

The first step is to extract the relevant fields for this quick analysis (a few days of work). Based on domain expertise, we retained the following fields:

  • IP address
  • Day
  • UA (user agent) ID - so we created a look-up table for UA's
  • Partner ID
  • Affiliate ID

These 5 metrics are the base metrics to create the following summary table. Each (IP, Day, UA ID, Partner ID, Affiliate ID) represents our atomic (most granular) data bucket.

Building a summary table: the Map step

The summary table will be built as a text file (just like in Hadoop), the data key (for joins or groupings) being (IP, Day, UA ID, Partner ID, Affiliate ID). For each atomic bucket (IP, Day, UA ID, Partner ID, Affiliate ID) we also compute:

  • number of clicks
  • number of unique UA's
  • list of UA

The list of UA's, for a specific bucket, looks like ~6723|9~45|1~784|2, meaning that in the bucket in question, there are three browsers (with ID 6723, 45 and 784), 12 clicks (9 + 1 + 2), and that (for instance) browser 6723 generated 9 clicks.

In Perl, these computations are easily performed, as you sequentially browse the data. The following updates the click count:


Updating the list of UA's associated with a bucket is a bit less easy, but still almost trivial.

The problem is that at some point, the hash table becomes too big and will slow down your Perl script to a crawl. The solution is to split the big data in smaller data sets (called subsets), and perform this operation separately on each subset. This is called the Map step, in Map-Reduce. You need to decide which fields to use for the mapping. Here, IP address is a good choice because it is very granular (good for load balance), and the most important metric. We can split the IP address field in 20 ranges based on the first byte of the IP address. This will result in 20 subsets. The splitting in 20 subsets is easily done by browsing sequentially the big data set with a Perl script, looking at the IP field, and throwing each observation in the right subset based on the IP address.

Building a summary table: the Reduce step

Now, after producing the 20 summary tables (one for each subset), we need to merge them together. We can't simply use hash table here, because they will grow too large and it won't work - the reason why we used the Map step in the first place.

Here's the work around:

Sort each of the 20 subsets by IP address. Merge the sorted subsets to produce a big summary table T. Merging sorted data is very easy and efficient: loop over the 20 sorted subsets with an inner loop over the observations in each sorted subset; keep 20 pointers, one per sorted subset, to keep track of where you are in your browsing, for each subset, at any given iteration.

Now you have a big summary table T, with multiple occurrences of the same atomic bucket, for many atomic buckets. Multiple occurrences of a same atomic bucket must be aggregated. To do so, browse sequentially table T (stored as text file). You are going to use hash tables, but small ones this time. Let's say that you are in the middle of a block of data corresponding to a same IP address, say (remember, T is ordered by IP address). Use


to update (that is, aggregate click count) corresponding to atomic bucket (, Day, UA ID, Partner ID, Affiliate ID). Note one big difference between $hash_clicks and $hash_clicks_small: IP address is not part of the key in the latter one, resulting in hash tables millions of time smaller. When you hit a new IP address when browsing T, just save the stats stored in $hash_small and satellites small hash tables for the previous IP address, free the memory used by these hash tables, and re-use them for the next IP address found in table T, until you arrived at the end of table T.

Now you have the summary table you wanted to build, let's call it S. The initial data set had 50 million clicks and dozens of fields, some occupying a lot of space. The summary table is much more manageable and compact, although still far too large to fit in Excel.

Creating rules

The rule set for fraud detection will be created based only on data found in the final summary table S (and additional high-level summary tables derived from S alone). An example of rule is "IP address is active 3+ days over the last 7 days". Computing the number of clicks and analyzing this aggregated click bucket, is straightforward, using table S. Indeed, the table S can be seen as a "cube" (from a database point of view), and the rules that you create simply narrow down on some of the dimensions of this cube. In many ways, creating a rule set consists in building less granular summary tables, on top of S, and testing. 


IP addresses can be mapped to an IP category, and IP category should become a fundamental metric in your rule system. You can compute summary statistics by IP category. See details in my article Internet topology mapping. Finally, automated nslookups should be performed on thousands of test IP addresses (both bad and good, both large and small in volume).

Likewise, UA's (user agents) can be categorized, a nice taxonomy problem by itself. At the very least, use three UA categories: mobile, (nice) crawler that identifies itself as a crawler, and other. The use of UA list such as ~6723|9~45|1~784|2 (see above) for each atomic bucket is to identify schemes based on multiple UA's per IP, as well as the type of IP proxy (good or bad) we are dealing with.

Historical note: Interestingly, the first time I was introduced to a Map-Reduce framework was when  I worked at Visa in 2002, processing rather large files (credit card transactions). These files contained 50 million observations. SAS could not sort them, it would make SAS crashes because of the many and large temporary files SAS creates to do  big sort. Essentially it would fill the hard disk. Remember, this was 2002 and it was an earlier version of SAS, I think version 6. Version 8 and above are far superior. Anyway, to solve this sort issue - an O(n log n) problem in terms of computational complexity - we used the "split / sort subsets / merge and aggregate" approach described in my article.


I showed you how to extract/summarize data from large log files, using Map-Reduce, and then creating an hierarchical data base with multiple, hierarchical levels of summarization, starting with a granular summary table S containing all the information needed at the atomic level (atomic data buckets), all the way up to high-level summaries corresponding to rules. In the process, only text files are used. You can call this an NoSQL Hierarchical Database (NHD). The granular table S (and the way it is built) is similar to the Hadoop architecture.

Views: 37668


You need to be a member of Data Science Central to add comments!

Join Data Science Central

Comment by Altan Atabarezz on June 13, 2015 at 8:33am

neat example, I guess coding this example in python would make

a very nice little big data course...

Comment by Vincent Granville on January 26, 2014 at 9:41am
Erich Schubert wrote:
IMHO you should emphasize the shuffle step more. As far as I can tell, MapReduce is working good only when you make good use of the shuffle. If you e.g. inflate your data to O(n^2) for the shuffle, it hurts badly (see also the discussion of tradeoffs and replication rate in: +Jeffrey Ullman "Designing good mapreduce Algorithms." XRDS: Crossroads, The ACM Magazine for Students 19.1 (2012): 30-34.)
In your case, using the IP as key of course yields a replication factor of 1, so not a lot of room for improvement left. But in many interesting cases, we cant use a straightforward group-by-key and then get independent partitions.
I found that working on the interim data representation (between map and reduce) is the key to optimize mapreduce jobs (apart from tuning your cluster configuration), 

© 2021   TechTarget, Inc.   Powered by

Badges  |  Report an Issue  |  Privacy Policy  |  Terms of Service