library(nws) ocTicketBlockSize = 300 ocState = new.env() # Example of "owner computes" parallelism. Basic idea: we imagine a # pool of (virutal) tickets, one per "task" to be done. A "task" can # be any island of code. Each participant executes the same general # control flow. When it hits a "task", it does the work for the task # only if it has the ticket for it. The "ticket number" for a task is # the number of ownership checks done prior to the task. So, for # example, the very first task corresponds to ticket number one. A # participant starts out with no tickets. If a participant invokes the # ownership check with no valid tickets, it uses the networkspace to # claim a block of tickets owned by no other participant. It then # compares the invocation instance with the tickets it owns and # returns true if the instance is in its range of tickets, false # otherwise. Tickets become invalid once the instance number exceeds # the largest ticket in the block. Once the processing has been done # for a group of related tasks, a function is called to merge the # partial results from each participant. This too uses the # networkspace. # This is a very flexible framework for parallelism that frequently # requires only modest modifications to the original sequential code, # but it does require that every participant make the same sequence of # ownership calls. It also usually requires that one participant play # the role of manager: initializing certain global data, orchestrating # data merges, generating final output. Often, as in this example, that # distinction is realized via this moduel and not in the original code. # initialize local state. ocReset <- function() { ocState$maxT = -1 ocState$stepper = 0 ocState$myTickets = c() } ocInit <- function() { # what role do i play? ocState$worker = Sys.getenv('AM_ND_WORKER') != '' # set up workspace. if (ocState$worker) { ocState$ws = netWorkSpace('am nd workspace rob', serverHost='swift', port=8765, useUse=TRUE) } else { ocState$ws = netWorkSpace('am nd workspace rob', serverHost='swift', port=8765) nwsDeclare(ocState$ws, 'ticket', 'single') nwsStore(ocState$ws, 'ticket', 1) } ocReset() } ocOwn <- function() { # count invocations. ocState$stepper = ocState$stepper + 1 if (ocState$maxT <= ocState$stepper) { # grab a block of tickets ocState$minT = nwsFetch(ocState$ws, 'ticket') ocState$maxT = ocState$minT + ocTicketBlockSize nwsStore(ocState$ws, 'ticket', ocState$maxT) # remember which tickets i have. ocState$myTickets = append(ocState$myTickets, ocState$minT) } # do i have the ticket for this invocation? return (ocState$minT <= ocState$stepper) && (ocState$stepper < ocState$maxT) } ocMergePermResults <- function(pr) { nCols = dim(pr$gini)[2] # store the partials results for every block of work tickets i own. for (t in ocState$myTickets) { lim = min(t + ocTicketBlockSize - 1, nCols) if (lim < t) { break } nwsStore(ocState$ws, 'pr block', list(t=t, lim=lim, gini=pr$gini[, t:lim], overall=pr$overall[, t:lim], out0=pr$out0[, t:lim], out1=pr$out1[, t:lim])) } ocReset() ocTicketBlockSize <<- 3 # fewer sims than samples, so use smaller block size. # workers now wait for merged results. if (ocState$worker) return(nwsFind(ocState$ws, 'perm result')) # master collates results and stores complete set for workers. doneCols = 0 while (doneCols < nCols) { r = nwsFetch(ocState$ws, 'pr block') doneCols = doneCols + r$lim - r$t + 1 for (n in names(pr)) { '<-'('['(pr[[n]], , r$t:r$lim), r[[n]]) } } # reset ticket --- to sync correctly, this must come before the perm result store. nwsStore(ocState$ws, 'ticket', 1) nwsStore(ocState$ws, 'perm result', pr) return (pr) } # see previous. ocMergePermMean <- function(pm, chip) { nCols = dim(pm$gini)[2] for (t in ocState$myTickets) { lim = min(t + ocTicketBlockSize - 1, nCols) if (lim < t) { break } nwsStore(ocState$ws, 'pm block', list(t=t, lim=lim, gini=pm$gini[, t:lim], overall=pm$overall[, t:lim], out0=pm$out0[, t:lim], out1=pm$out1[, t:lim], chip=chip[, t:lim])) } # workers no longer needed. if (ocState$worker) quit(save='no') doneCols = 0 while (doneCols < nCols) { r = nwsFetch(ocState$ws, 'pm block') doneCols = doneCols + r$lim - r$t + 1 for (n in names(pm)) { '<-'('['(pm[[n]], , r$t:r$lim), r[[n]]) } chip[, r$t:r$lim] = r$chip } # we are all done, so clean up the workspace. nwsClose(ocState$ws) return (list(perm.mean=pm, chisq.pvalue=chip)) }