说明

以下所有内容摘自《Prallel Computing for Data Science with Examples in R, C++ and CUDA》(数据科学中的并行计算:以R、C++和CUDA为例)这本书,作者是美国加州大学戴维斯分校的统计系创始人之一,统计学和计算机双教授,还精通汉语(是的,你没看错,这老头就是这么牛逼。。。),后面花点时间准备再摘几个经典并实用的,对这块感兴趣的强烈推荐这本书!


循环调度的通用记法

  • 静态调度
  • 动态调度
  • 分块
  • 反向调度

snow中的分块

通过snow包中的clusterSplit()函数来对循环的i值进行分块。假设lnks有500行,并且我们有4个worker。这里的目标是把行号1,2,...,500划分到4个相等(或大致相等)的子集合中,这些子集合就是每个worker所要处理的索引块。显然,1~125,126~250,251~375,376~500分别对应串行代码中外层for循环的i值。第一个worker将处理迭代为1:125的外层循环,以此类推。

In [1]:
library(parallel)
In [2]:
doichunk <- function(ichunk) {
    tot <- 0
    nr <- nrow(lnks) # lnks在worker处是全局变量
    for (i in ichunk) {
        tmp <- lnks[(i+1):nr, ] %*% lnks[i, ]
        tot <- tot + sum(tmp)
    }
    tot
}
In [4]:
mutoutpar <- function(cls, lnks) {
    nr <- nrow(lnks) # lnks在manager处是全局变量
    clusterExport(cls, "lnks")
    ichunks <- clusterSplit(cls, 1:(nr - 1)) #把块1:125发送给第一个worker......375:499发送给第四个worker
    tots <- clusterApply(cls, ichunks, doichunk) # 返回的列表包含四个元素,而不是之前的499个
    Reduce(sum, tots) / nr
    }
In [5]:
initcls <- function(workers) {
    makeCluster(spec = workers)
}

cls <- initcls(4)
clusterSplit(cls, 1:50) # 返回四个列表
    1. 1
    2. 2
    3. 3
    4. 4
    5. 5
    6. 6
    7. 7
    8. 8
    9. 9
    10. 10
    11. 11
    12. 12
    13. 13
    1. 14
    2. 15
    3. 16
    4. 17
    5. 18
    6. 19
    7. 20
    8. 21
    9. 22
    10. 23
    11. 24
    12. 25
    1. 26
    2. 27
    3. 28
    4. 29
    5. 30
    6. 31
    7. 32
    8. 33
    9. 34
    10. 35
    11. 36
    12. 37
    1. 38
    2. 39
    3. 40
    4. 41
    5. 42
    6. 43
    7. 44
    8. 45
    9. 46
    10. 47
    11. 48
    12. 49
    13. 50
In [15]:
snowsim <- function(nr,nc, cls) {
    lnks <<- 
        matrix(sample(0:1, (nr*nc), replace= TRUE),
              nrow = nr)
    system.time(mutoutpar(cls, lnks))
}
In [16]:
initmc <- function(nworkers) {
    makeCluster(nworkers) 
}

cl2 <- initmc(2)
snowsim(2000, 2000, cl2)
   user  system elapsed 
  0.086   0.015  58.183 

可以看到,相对之前没有分块,elapsed时间明显减少


示例:所有可能回归

主要的问题是预测变量的选择:一方面,要在回归方程中包含所有相关的预测变量。另一方面,我们必须避免过拟合。

假设我们有n个观测值,p个预测变量。通过无偏估计$adjusted$ $R^2$来选择模型

并行化策略

一共有$2^p$种可能的模型,因此计算量会相当大————这非常适合使用并行计算。在这里有两种可能:

  • (a)对预测变量集合中的每一个,我们都可以并行的对其做回归计算。例如,在计算使用预测变量2和5的模型时,所有的进程都一起工作。
  • (b)我们可以给每个进程分配不同的预测变量集合,进程随后在它所分配的集合上做回归计算。例如,一个进程可能会对使用了预测变量2和5的全部模型进行计算,另一个进程则处理使用预测变量8,9和12的模型,等等

选项(a)存在问题。对一个给定的包含m个预测变量的集合,我们必须先计算各种平方和与乘积和。每个加法都有n个被加数,一共有$O(m^2)$个加法计算,这使得计算复杂度为$O(nm^2)$。之后必须进行矩阵求逆(或者其它等价计算,比如QR分解),其复杂度为$O(m^3)^2$。

矩阵求逆迄今仍然不是一个易并行的计算,所以选项(b)简单很多,是易并行的,事实上它包含一个循环。

代码

下面并行由snow包实现,它在所有满足条件的模型中计算校正决定系数值,这些模型预测变量集合的大小最大为k。用户可以选择静态或者动态调度,或者反向调度,用户可以指定一个(不变的)块的大小。

算法的整体策略:

  • manager决定了所有预测变量集合的大小,最大为k
  • manager分配worker来处理指定的预测变量集合
  • 每个worker调用R的lm()线性模型函数,对分配给它的每个预测变量集合来计算校正决定系数
  • manager收集结果,将其聚合到一个结果矩阵。矩阵的第i行是校正系数和其所对应的预测变量集合
In [7]:
# 回归响应变量列Y以及Xi预测变量的所有可能子集
# 子集规模最大为K
# 返回每个子集的校正决定系数

# 调度参数:
# static(clusterApply())
# dynamic(clusterApply())
# 颠倒任务的顺序
# 块大小(动态情形下)

# 参数:
# cls: Snow集群
# x: 预测变量矩阵,每列一个
# y: 响应变量向量
# k: 预测变量集合大小的最大值
# reverse: TRUE表示对迭代顺序进行颠倒
# dyn: TRUE表示动态调度
# chunksize: 调度块的规模

# 返回值:
# R矩阵,显示校正决定系数
# 使用预测集合来进行索引

snowapr <- function(cls, x, y, k, reverse = F, dyn = F, chunksize = 1) {
    require(parallel)
    p <- ncol(x)
    # 生成预测变量子集,一个R的list,1个元素代表一个预测变量子集
    allcombs <- genallcombs(p, k)
    ncombs <- length(allcombs)
    clusterExport(cls, "dolpset")
    # 设定任务索引
    tasks <- if(!reverse)
        seq(1, ncombs, chunksize) 
    else
        seq(ncombs, 1, -chunksize)
    
    if(!dyn) {
        out <- clusterApply(cls, tasks, dochunk, x, y, allcombs, chunksize)
    }
    # out的每个元素都由校正决定系数(adjusted R^2)和产生这个值的预测变量集合的索引构成,然后把这些变量集合到一个矩阵中
    Reduce(rbind, out)
}
In [8]:
# 生成1..p的所有大小 <= k的非空集合;
# 返回一个索引向量形式的R list,每个元素代表一个预测变量集合
genallcombs <- function(p, k) {
    allcombs <- list()
    for (i in 1:k) {
        tmp <- combn(1:p, i)
        allcombs <- c(allcombs, matrixtolist(tmp, rc = 2))
    }
    allcombs
}
In [9]:
# 从矩阵中提取行(rc = 1)或列(rc = 2), 生成一个list
matrixtolist <- function(rc, m) {
    if (rc == 1) {
        Map(function(rownum) m[rownum, ], 1:nrow(m))
    } else
        Map(function(colnum) m[, colnum], 1:ncol(m))
}
In [10]:
# 处理allcombs块中的所有预测变量集合
# 这个分块的第一个索引是psetsstart
dochunk <- function(psetsstart, x, y, allcombs, chunksize) {
    ncombs <- length(allcombs)
    lasttask <- min(psetsstart + chunksize - 1, ncombs)
    t(sapply(allcombs[psetsstart:lasttask], dolpset, x, y))
}
In [11]:
# 找到指定预测变量集合onepset的校正决定系数(adjusted R^2);
# 返回值是校正决定系数,紧跟着用0来填充空位的预测变量集合的索引
# 为了方便,对dolpset()的调用所返回的向量的长度都是k+1
# 例如,对k=4,(0.28, 1, 3, 0, 0)意味着预测变量集合是由x的第1和第3列,R平方值为0.28
dolpset <- function(onepset, x, y) {
    slm <- summary(lm(y ~ x[, onepset]))
    nOs <- ncol(x) - length(onepset)
    c(slm$adj.r.squared, onepset, rep(0, nOs))
}
In [12]:
# 看起来最好的预测变量集合
snowtest <- function(cls, n, p, k, chunksize = 1, dyn = F, rvrs = F) {
    gendata(n, p)
    snowapr(cls, x, y, k, rvrs, dyn, chunksize)
}
In [13]:
gendata <- function(n, p) {
    x <<- matrix(rnorm(n*p), ncol = p)
    y <<- x %*% c(rep(0.5, p)) + rnorm(n)
}

样例运行

In [14]:
initmc <- function(nworkers) {
    makeCluster(nworkers) 
}

cl8 <- initmc(8) 

# 在这里我们生成了大小n = 100的模拟数据,有p = 4个预测变量,预测变量集合的最大k =2,对使用预测变量1和3的模型,
# 即x的第2列和第4列,校正决定系数的最大值大约是0.398 
snowtest(cl8, 100, 4, 2)
0.18555501 0 0 0
0.18710472 0 0 0
0.15683083 0 0 0
0.13335694 0 0 0
0.30439221 2 0 0
0.36844511 3 0 0
0.31510141 4 0 0
0.31338252 3 0 0
0.32558192 4 0 0
0.28868463 4 0 0