12
返回列表 发新帖
楼主: Sky-Tiger

Discretized Streams: A Fault-Tolerant Model for Scalable Stream Processing

[复制链接]
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
11#
 楼主| 发表于 2013-7-24 22:01 | 只看该作者
To offer groups requires two steps. The first step is to add information about which
articles are in which groups, and the second is to actually fetch articles from a group.
We’ll use a SET for each group, which stores the article IDs of all articles in that group.
In listing 1.9, we see a function that allows us to add and remove articles from groups.
Listing 1.9
The add_remove_groups() function
def add_remove_groups(conn, article_id, to_add=[], to_remove=[]):
article = 'article:' + article_id
Construct the article
for group in to_add:
information like we
conn.sadd('group:' + group, article)
did in post_article.
for group in to_remove:
conn.srem('group:' + group, article)
Remove the article from
groups that it should be
removed from.
Add the article
to groups that it
should be a part of.
At first glance, these SETs with article information may not seem that useful. So far,
you’ve only seen the ability to check whether a SET has an item. But Redis has the
capability to perform operations involving multiple SETs, and in some cases, Redis can
perform operations between SETs and ZSETs.

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
12#
 楼主| 发表于 2013-7-24 22:05 | 只看该作者
You’ll remember from chapters 1 and 2 that STRINGs hold sequences of bytes, not sig-
nificantly different from strings in many programming languages, or even C/C++–
style char arrays. In Redis, STRINGs are used to store three types of values:
Byte string values
Integer values
Floating-point values



Integers and floats can be incremented or decremented by an arbitrary numeric value
(integers turning into floats as necessary). Integers have ranges that are equivalent to
the platform’s long integer range (signed 32-bit integers on 32-bit platforms, and
signed 64-bit integers on 64-bit platforms), and floats have ranges and values limited
to IEEE 754 floating-point doubles. This three-way ability to look at the simplest of
Redis values can be an advantage; it offers more flexibility in data representation than
if only byte string values were allowed.
In this section, we’ll talk about the simplest structure available to Redis, the
STRING. We’ll cover the basic numeric increment and decrement operations, followed
later by the bit and substring manipulation calls, and you’ll come to understand that
even the simplest of structures has a few surprises that can make it useful in a variety of
powerful ways.

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
13#
 楼主| 发表于 2013-7-24 22:06 | 只看该作者

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
14#
 楼主| 发表于 2013-8-21 19:47 | 只看该作者
Writing to an HBase table from MapReduce (figure 3.13) as a data sink is similar to
reading from a table in terms of implementation.
HBase provides similar tooling to simplify the configuration. Let’s first make an
example of sink configuration in a standard MapReduce application.
In TimeSpent, the values of [k3,v3] generated by the aggregators are
[UserID:TotalTime]. In the MapReduce application, they’re of the Hadoop serializ-
able types Text and LongWritable, respectively. Configuring output types is similar to
configuring input types, with the exception that the [k3,v3] output types can’t be
inferred by the OutputFormat:
Configuration conf = new Configuration();
Job job = new Job(conf, "TimeSpent");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
...
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
In this case, no line numbers are specified. Instead, the TextOuputFormat schema cre-
ates a tab-separated output file containing first the UserID and then the TotalTime.
What’s written to disk is the String representation of both types.

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
15#
 楼主| 发表于 2013-8-21 19:47 | 只看该作者
The last step is wiring your reducer into the job configuration. You need to specify
the destination table along with all the appropriate types. Once again, it’s
TableMapReduceUtil to the rescue; it sets up the TableOutputFormat for you! You
use IdentityTableReducer, a provided class, because you don’t need to perform any
computation in the Reduce Step:
TableMapReduceUtil.initTableReducerJob(
"users",
IdentityTableReducer.class,
job);
Now your job is completely wired up, and you can proceed as normal. Unlike the case
where map tasks are reading from HBase, tasks don’t necessarily write to a single region.
The writes go to the region that is responsible for the rowkey that is being written by the
reduce task. The default partitioner that assigns the intermediate keys to the reduce
tasks doesn’t have knowledge of the regions and the nodes that are hosting them and
therefore can’t intelligently assign work to the reducers such that they write to the local
regions. Moreover, depending on the logic you write in the reduce task, which doesn’t
have to be the identity reducer, you might end up writing all over the table.

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
16#
 楼主| 发表于 2013-8-21 20:29 | 只看该作者
You’d like to know the ratio of how much time a user spends on the site to their total
twit count. Although this is an easy question to answer, right now the relevant data is
split between two different datasets. You’d like to join this data such that all the infor-
mation about a user is in a single row. These two datasets share a common attribute:
UserID. This will be the join key. The result of performing the join and dropping
unused fields looks like this:
UserID TwitCount TimeSpent
Yvonn66 48 30s
Mario23 56 2s
Rober4 2 6s
Masan46 47 35s
Joins in the relational world are a lot easier than in MapReduce. Relational engines
enjoy many years of research and tuning around performing joins. Features like
indexing help optimize join operations. Moreover, the data typically resides on a sin-
gle physical server. Joining across multiple relational servers is far more complicated
and isn’t common in practice. A join in MapReduce means joining on data spread
across multiple servers. But the semantics of the MapReduce framework make it easier
than trying to do a join across different relational database systems. There are a cou-
ple of different variations of each type, but a join implementation is either map-side or
reduce-side. They’re referred as map- or reduce-side because that’s the task where
records from the two sets are linked. Reduce-side joins are more common because
they’re easier to implement. We’ll describe those first.

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
17#
 楼主| 发表于 2013-8-21 20:41 | 只看该作者
can fit in memory of the map task, the problem is solved: load the smaller dataset into
a hash-table so the map tasks can access it while iterating over the other dataset. In
these cases, you can skip the Shuffle and Reduce Steps entirely and emit your final
output from the Map Step. Let’s go back to the same example. This time you’ll put the
Users dataset into memory. The new map_timespent task looks like this:
map_timespent(line_num, line):
users_recs = read_timespent("/path/to/users.csv")
userid, timespent = split(line)
record = {"TimeSpent" : timespent}
record = merge(record, users_recs[userid])
emit(userid, ratio(record["TimeSpent"], record["TwitCount"]))
Compared to the last version, this looks like cheating! Remember, though, you can
only get away with this approach when you can fit one of the datasets entirely into
memory. In this case, your join will be much faster.
There are of course implications to doing joins like this. For instance, each map task
is processing a single split, which is equal to one HDFS block (typically 64–128 MB), but
the join dataset that it loads into memory is 1 GB. Now, 1 GB can certainly fit in memory,
but the cost involved in creating a hash-table for a 1 GB dataset for every 128 MB of data
being joined makes it not such a good idea.

使用道具 举报

回复
论坛徽章:
350
2006年度最佳版主
日期:2007-01-24 12:56:49NBA大富翁
日期:2008-04-21 22:57:29地主之星
日期:2008-11-17 19:37:352008年度最佳版主
日期:2009-03-26 09:33:53股神
日期:2009-04-01 10:05:56NBA季后赛大富翁
日期:2009-06-16 11:48:01NBA季后赛大富翁
日期:2009-06-16 11:48:01ITPUB年度最佳版主
日期:2011-04-08 18:37:09ITPUB年度最佳版主
日期:2011-12-28 15:24:18ITPUB年度最佳技术原创精华奖
日期:2012-03-13 17:12:05
18#
 楼主| 发表于 2013-8-21 20:42 | 只看该作者
Hadoop MapReduce assumes your map and reduce tasks are idempotent. This
means the map and reduce tasks can be run any number of times with the same in-
put and produce the same output state. This allows MapReduce to provide fault tol-
erance in job execution and also take maximum advantage of cluster processing
power. You must take care, then, when performing stateful operations. HBase’s In-
crement command is an example of such a stateful operation.
For example, suppose you implement a row-counting MapReduce job that maps over
every key in the table and increments a cell value. When the job is run, the JobTracker
spawns 100 mappers, each responsible for 1,000 rows. While the job is running, a
disk drive fails on one of the TaskTracker nodes. This causes the map task to fail,
and Hadoop assigns that task to another node. Before failure, 750 of the keys were
counted and incremented. When the new instance takes up that task, it starts again
at the beginning of the key range. Those 750 rows are counted twice.
Instead of incrementing the counter in the mapper, a better approach is to emit
["count",1] pairs from each mapper. Failed tasks are recovered, and their output
isn’t double-counted. Sum the pairs in a reducer, and write out a single value from
there. This also avoids an unduly high burden being applied to the single machine
hosting the incremented cell.
Another thing to note is a feature called speculative execution. When certain tasks
are running more slowly than others and resources are available on the cluster, Ha-
doop schedules extra copies of the task and lets them compete. The moment any
one of the copies finishes, it kills the remaining ones. This feature can be enabled/
disabled through the Hadoop configuration and should be disabled if the MapReduce
jobs are designed to interact with HBase.

使用道具 举报

回复

您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

TOP技术积分榜 社区积分榜 徽章 团队 统计 知识索引树 积分竞拍 文本模式 帮助
  ITPUB首页 | ITPUB论坛 | 数据库技术 | 企业信息化 | 开发技术 | 微软技术 | 软件工程与项目管理 | IBM技术园地 | 行业纵向讨论 | IT招聘 | IT文档
  ChinaUnix | ChinaUnix博客 | ChinaUnix论坛
CopyRight 1999-2011 itpub.net All Right Reserved. 北京盛拓优讯信息技术有限公司版权所有 联系我们 未成年人举报专区 
京ICP备16024965号-8  北京市公安局海淀分局网监中心备案编号:11010802021510 广播电视节目制作经营许可证:编号(京)字第1149号
  
快速回复 返回顶部 返回列表