添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

PairRDD及其创建

键值对RDD称为PairRDD,通常用来做 聚合计算 。Spark为Pair RDD提供了许多专有的操作。

1
2
3
4
# 创建pair rdd: map 或者 读取键值对格式自动转成pairrdd

# 每行的第一个单词作为key,line作为value
pairs = lines.map(lambda line: (line.split(' ')[0], line))

Pair RDD 的转化操作分为单个和多个RDD的转化操作。

单个Pair RDD转化

reduceByKey(func)

合并 含有相同键的值,也称作 聚合

1
2
3
4
5
6
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect())
# [('a', 2), ('b', 1)]
# 这种写法也可以
rdd.reduceByKey(lambda x, y: x+y).collect()

groupByKey

对具有相同键的值进行 分组 。会生成hash分区的RDD

1
2
3
4
5
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
# [('a', 2), ('b', 1)]
sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]

说明:如果对键进行分组以便对每个键进行聚合(如sum和average),则用 reduceByKey aggregateByKey 性能更好

combineByKey

合并具有相同键的值,但是 返回不同类型 (K, V) - (K, C)。最常用的聚合操作。

1
2
3
4
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
def add(a, b): return a + str(b)
sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]

下面是combineByKey的源码和参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def combineByKey[C](
createCombiner: V => C, // V => C的转变 / 初始值 / 创建one-element的list
mergeValue: (C, V) => C, // 将原V累加到新的C
mergeCombiners: (C, C) => C, // 两个C合并成一个
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
//实现略
}
// 求平均值
val scores = sc.parallelize(
List(("chinese", 88.0) , ("chinese", 90.5) , ("math", 60.0), ("math", 87.0))
)
val avg = scores.combineByKey(
(v) => (v, 1),
(acc: (Float, Int), V) => (acc._1 + v, acc._2 + 1),
(acc1: (Float, Int), acc2: (Float, Int) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
).map{case (key, value) => (key, value._1 / value._2.toFloat)}
1
2
3
4
5
6
7
8
9
10
11
# 求平均值
nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)])
sum_count = nums.combineByKey(
lambda x: (x, 1),
lambda acc, x: (acc[0] + x, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
# [('c', (170, 2)), ('m', (95, 1))]
avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
# {'c': 85, 'm': 95}
avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()

mapValues(f)

对每个pair RDD中的每个Value应用一个func,不改变Key。其实也是 对value做map 操作。一般我们只想访问pair的值的时候,可以用 mapValues 。类似于map{case (x, y): (x, func(y) )}

1
2
3
4
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
[('a', 3), ('b', 1)]

mapPartitions(f)

map 的一个变种,都需要传入一个函数f,去处理数据。不同点如下:

  • map: f应用于每一个元素。
  • mapPartitions: f应用于每一个分区。分区的内容以Iterator[T]传入f,f的输出结果是Iterator[U]。最终RDD的由所有分区经过输入函数处理后的结果得到的。
  • 优点:我们可以为每一个分区做一些初始化操作,而不用为每一个元素做初始化。例如,初始化数据库,次数n。map时:n=元素数量,mapPartitions时:n=分区数量。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    # 1. 每个元素加1
    def f(iterator):
    print ("called f")
    return map(lambda x: x + 1, iterator)
    rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
    rdd.mapPartitions(f).collect() # 只调用2次f
    """
    called f
    called f
    [2, 3, 4, 5, 6]
    """

    # 2. 分区求和
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
    def f(iterator):
    print "call f"
    yield sum(iterator)
    rdd.mapPartitions(f).collect() # 调用2次f,分区求和
    """
    call f
    call f
    [6, 15]
    """

    mapPartitionsWithIndex(f)

    mapPartitions 一样,只是多了个partition的index。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    rdd = sc.parallelize(["yellow","red","blue","cyan","black"], 3)
    def g(index, item):
    return "id-{}, {}".format(index, item)
    def f(index, iterator):
    print 'called f'
    return map(lambda x: g(index, x), iterator)
    rdd.mapPartitionsWithIndex(f).collect()
    """
    called f
    called f
    called f
    ['id-0, yellow', 'id-1, red', 'id-1, blue', 'id-2, cyan', 'id-2, black']
    """

    repartition(n)

    生成新的RDD,分区数目为n。会增加或者减少 RDD的并行度。会对分布式数据集进行 shuffle 操作, 效率低 。如果只是想减少分区数,则使用 coalesce ,不会进行shuffle操作。

    1
    2
    3
    4
    5
    6
    7
    >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
    >>> sorted(rdd.glom().collect())
    [[1], [2, 3], [4, 5], [6, 7]]
    >>> len(rdd.repartition(2).glom().collect())
    2
    >>> len(rdd.repartition(10).glom().collect())
    10

    coalesce(n)

    合并,减少分区数,默认不执行shuffle操作。

    1
    2
    3
    4
    sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
    # [[1], [2, 3], [4, 5]]
    sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
    # [[1, 2, 3, 4, 5]]

    flatMapValues(f)

    打平values,[("k", ["v1", "v2"])] -- [("k","v1"), ("k", "v2")]

    1
    2
    3
    4
    x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    def f(x): return x
    x.flatMapValues(f).collect()
    # [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

    values

    sortByKey

    返回一个对键进行排序的RDD。会生成范围分区的RDD

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    sc.parallelize(tmp).sortByKey().first()
    # ('1', 3)
    sc.parallelize(tmp).sortByKey(True, 1).collect()
    # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    sc.parallelize(tmp).sortByKey(True, 2).collect()
    # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

    tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
    tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
    sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
    # [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

    两个Pair RDD转化

    substract

    留下在x中却不在y中的元素

    1
    2
    3
    4
    x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
    y = sc.parallelize([("a", 3), ("c", None)])
    sorted(x.subtract(y).collect())
    #[('b', 4), ('b', 5)]

    substractByKey

    删掉X中在Y中也存在的Key所包含的所有元素

    内连接,从x中去和y中一个一个的匹配,(k, v1), (k, v2) -- (k, (v1, v2))

    1
    2
    3
    4
    x = sc.parallelize([("a", 1), ("b", 4)])
    y = sc.parallelize([("a", 2), ("a", 3)])
    sorted(x.join(y).collect())
    # [('a', (1, 2)), ('a', (1, 3))]

    leftOuterJoin

    左边RDD的键都有,右边没有的配None

    1
    2
    3
    4
    x = sc.parallelize([('a', 1), ('b', 4)])
    y = sc.parallelize([('a', 2)])
    sorted(x.leftOuterJoin(y).collect())
    # [('a', (1, 2)), ('b', (4, None))]

    rightOuterJoin

    右边RDD的键都有,左边没有的配None

    1
    2
    3
    4
    5
    6
    x = sc.parallelize([('a', 1), ('b', 4)])
    y = sc.parallelize([('a', 2)])
    sorted(x.rightOuterJoin(y).collect())
    # [('a', (1, 2))]
    sorted(y.rightOuterJoin(x).collect())
    # [('a', (2, 1)), ('b', (None, 4))]

    cogroup

    将两个RDD中拥有相同键的value分组到一起,即使两个RDD的V不一样

    1
    2
    3
    4
    5
    6
    x = sc.parallelize([('a', 1), ('b', 4)])
    y = sc.parallelize([('a', 2)])
    x.cogroup(y).collect()
    # 上面显示的是16进制
    [(x, tuple(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
    # [('a', ([1], [2])), ('b', ([4], []))]

    countByKey

    对每个键对应的元素分别计数

    1
    2
    3
    rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])
    rdd.countByKey().items() # 转换成一个dict,再取所有元素
    # [('a', 2), ('b', 1)]

    collectAsMap

    返回一个map

    1
    2
    3
    m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
    m[1] - 2 # 当做map操作即可
    m[3] - 4

    lookup(key)

    返回键的RDD中的值列表。如果RDD具有已知的分区器,则通过仅搜索key映射到的分区来高效地完成该操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    l = range(1000) # 1,2,3,...,1000
    rdd = sc.parallelize(zip(l, l), 10) # 键和值一样,10个数据分片,10个并行度,10个task
    rdd.lookup(42) # slow
    # [42]
    sorted_rdd = rdd.sortByKey()
    sorted_rdd.lookup(42) # fast
    # [42]
    rdd = sc.parallelize([('a', 'a1'), ('a', 'a2'), ('b', 'b1')])
    rdd.lookup('a')[0]
    # 'a1'

    当数据是键值对组织的时候, 聚合具有相同键的元素 是很常见的操作。基础RDD有 fold() , combine() , reduce() ,Pair RDD有 combineByKey() 最常用 , reduceByKey() , foldByKey() 等。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    ## 方法一:mapValues和reduceByKey
    rdd = sc.parallelize([('a', 1), ('a', 3), ('b', 4)])
    maprdd = rdd.mapValues(lambda x : (x, 1))
    # [('a', (1, 1)), ('a', (3, 1)), ('b', (4, 1))]
    reducerdd = maprdd.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    # [('a', (4, 2)), ('b', (4, 1))]
    reducerdd.mapValues(lambda x : x[0]/x[1]).collect()
    # [('a', 2), ('b', 4)]

    ## 方法二 combineByKey 最常用的
    nums = sc.parallelize([('c', 90), ('m', 95), ('c', 80)])
    sum_count = nums.combineByKey(
    lambda x: (x, 1),
    lambda acc, x: (acc[0] + x, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
    )
    # [('c', (170, 2)), ('m', (95, 1))]
    avg_map = sum_count.mapValues(lambda (sum, count): sum/count).collectAsMap()
    # {'c': 85, 'm': 95}
    avg_map = sum_count.map(lambda key, s_c: (key, s_c[0]/s_c[1])).collectAsMap()

    统计单词计数

    1
    2
    3
    4
    5
    6
    7
    rdd = sc.textFile('README.md')
    words = rdd.flatMap(lambda x: x.split(' '))
    # 568个
    result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
    # 289
    result.top(2)
    # [('your', 1), ('you', 4)]

    在分布式程序中,通信的代价是很大的。因此控制数据分布以获得 最少的网络传输 可以极大地提升整体性能。Spark程序通过 控制RDD分区方式来减少通信开销 。使用 partitionBy 进行分区

  • 不需分区:给定RDD只需要被扫描一次
  • 需要分区:数据集在 多次基于键的操作 中使用,比如连接操作。 partitionBy 转化操作 ,生成新的RDD,为了多次计算,一般要进行持久化 persist
  • Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark不能显示控制具体每个键落在哪一个工作节点上,但是Spark可以确保同一组的键出现在同一个节点上。

  • Hash分区:将一个RDD分成了100个分区,hashcode(key)%100 相同的,会在同一个节点上面
  • 范围分区:将key在同一个范围区间内的记录放在同一个节点上
  • 一个简单的例子,内存中有一张很大的用户信息表 -- 即(UserId, UserInfo)组成的RDD,UserInfo包含用户订阅了的所有Topics。还有一张(UserId, LinkInfo)存放着过去5分钟用户浏览的Topic。现在要找出用户浏览了但是没有订阅的Topic数量。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val sc = new SparkContext(...)
    val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...").persist()

    def processNewLog(logFileName: String) {
    val events = sc.sequenceFile[UserId, LinkInfo](logFileName)
    val joined = userData.join(events) // (UserId, (UserInfo, LinkInfo))
    val offTopicVisits = joined.filter{
    case (UserId, (UserInfo, LinkInfo)) =>
    !UserInfo.topics.contains(LinkInfo.topic)
    }.count()
    print ("浏览了且未订阅的数量:" + offTopicVisits)
    }

    这段代码 不够高效 。因为每次调用 processNewLog 都会用 join 操作,但我们却不知道数据集是如何分区的。

    连接操作 ,会将两个数据集的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在机器上对所有键相同的记录进行操作。

    因为userData比events要大的多并且基本不会变化,所以有很多浪费效率的事情:每次调用时都对userData表进行计算hash值计算和跨节点数据混洗。

    解决方案 :在程序开始的时候,对userData表使用 partitionBy() 转换操作,将 这张表转换为哈希分区

    1
    2
    3
    val userData = sc.sequenceFile[UserId, UserInfo]("hdfs://...")
    .partitionBy(new HashPartioner(100)) // 构造100个分区
    .persist() // 持久化当前结果

    events是本地变量,并且只使用一次,所以为它指定分区方式没有什么用处。

    userData使用了 partitionBy() ,Spark就知道该RDD是根据键的hash值来分区的。在 userData.join(events) 时,Spark 只会对events进行数据混洗操作 。将events中特定UserId的记录发送到userData的对应分区所在的那台机器上。需要网络传输的数据就大大减少了,速度也就显著提升了。

    分区相关的操作

    Spark的许多操作都有将数据根据跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。类似 join 这样的二元操作,预先进行数据分区会导致其中至少一个RDD 不发生数据混洗

    获取好处的操作: cogroup , groupWith , join , leftOuterJoin , rightOuterJoin , groupByKey , reduceByKey , combineByKey , lookup

    为结果设好分区的操作: cogroup , groupWith , join , leftOuterJoin , rightOuterJoin , groupByKey , reduceByKey , combineByKey , partitionBy , sort , ( mapValues , flatMapValues , filter 如果父RDD有分区方式的话)

    其他所有的操作的结果都不会存在特定的分区方式。对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况结果会采取hash分区。

    PageRank

    PageRank的python版本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    """ PageRank算法
    author = PuLiming
    运行: bin/spark-submit files/pagerank.py data/mllib/pagerank_data.txt 10
    """
    from __future__ import print_function

    import re
    import sys
    from operator import add

    from pyspark import SparkConf, SparkContext

    def compute_contribs(urls, rank):
    """ 给urls计算
    Args:
    urls: 目标url相邻的urls集合
    rank: 目标url的当前rank

    Returns:
    url: 相邻urls中的一个url
    rank: 当前url的新的rank
    """
    num_urls = len(urls)
    for url in urls:
    yield (url, rank / num_urls)


    def split_url(url_line):
    """ 把一行url切分开来
    Args:
    url_line: 一行url,如 1 2

    Returns:
    url, neighbor_url
    """
    parts = re.split(r'\s+', url_line) # 正则
    return parts[0], parts[1]


    def compute_pagerank(sc, url_data_file, iterations):
    """ 计算各个page的排名
    Args:
    sc: SparkContext
    url_data_file: 测试数据文件
    iterations: 迭代次数

    Returns:
    status: 成功就返回0
    """

    # 读取url文件 ['1 2', '1 3', '2 1', '3 1']
    lines = sc.textFile(url_data_file).map(lambda line: line.encode('utf8'))

    # 建立Pair RDD (url, neighbor_urls) [(1,[2,3]), (2,[1]), (3, [1])]
    links = lines.map(lambda line : split_url(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()
    # 初始化所有url的rank为1 [(1, 1), (2, 1), (3, 1)]
    ranks = lines.map(lambda line : (line[0], 1))

    for i in range(iterations):
    # (url, [(neighbor_urls), rank]) join neighbor_urls and rank
    # 把当前url的rank分别contribute到其他相邻的url (url, rank)
    contribs = links.join(ranks).flatMap(
    lambda url_urls_rank: compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1])
    )
    # 把url的所有rank加起来,再赋值新的
    ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * 0.85 + 0.15)

    for (link, rank) in ranks.collect():
    print("%s has rank %s." % (link, rank))

    return 0

    if __name__ == '__main__':

    if len(sys.argv) != 3:
    print("Usage: python pagerank.py <data.txt> <iterations>", file = sys.stderr)
    sys.exit(-1)

    # 数据文件和迭代次数
    url_data_file = sys.argv[1]
    iterations = int(sys.argv[2])

    # 配置 SparkContext
    conf = SparkConf().setAppName('PythonPageRank')
    conf.setMaster('local')
    sc = SparkContext(conf=conf)

    ret = compute_pagerank(sc, url_data_file, iterations)
    sys.exit(ret)

    PageRank的scala版本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    val sc = new SparkContext(...)
    val links = sc.objectFile[(String, Seq[String])]("links")
    .partitionBy(new HashPartitioner(100))
    .persist()

    var ranks = links.mapValues(_ => 1.0)

    // 迭代10次
    for (i <- 0 until 10) {
    val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
    links.map(dest => (dest, rank / links.size))
    }
    ranks = contributions.reduceByKey(_ + _).mapValues(0.15 + 0.85* _)
    }
    ranks.saveAsTextFile("ranks")

    当前scala版本的PageRank算法的优点:

  • links每次都会和ranks发生连接操作,所以一开始就对它进行分区 partitionBy ,就不会通过网络进行数据混洗了,节约了相当可观的网络通信开销
  • 对links进行 persist ,留在内存中,每次迭代使用
  • 第一次创建ranks,使用 mapValues 保留了父RDD的分区方式,第一次连接开销就会很小
  • reduceByKey 后已经是分区了,再使用 mapValues 分区方式,再次和links进行 join 就会更加 高效
  • 所以对分区后的RDD尽量使用 mapValues 保留父分区方式,而不要用 map 丢失分区方式。