-
-
Notifications
You must be signed in to change notification settings - Fork 9
/
OptimizerAsync.R
148 lines (130 loc) · 5.34 KB
/
OptimizerAsync.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#' @title Asynchronous Optimizer
#'
#' @include mlr_optimizers.R
#'
#' @description
#' The [OptimizerAsync] implements the asynchronous optimization algorithm.
#' The optimization is performed asynchronously on a set of workers.
#'
#' @details
#' [OptimizerAsync] is the abstract base class for all asynchronous optimizers.
#' It provides the basic structure for asynchronous optimization algorithms.
#' The public method `$optimize()` is the main entry point for the optimization and runs in the main process.
#' The method starts the optimization process by starting the workers and pushing the necessary objects to the workers.
#' Optionally, a set of points can be created, e.g. an initial design, and pushed to the workers.
#' The private method `$.optimize()` is the actual optimization algorithm that runs on the workers.
#' Usually, the method proposes new points, evaluates them, and updates the archive.
#'
#' @export
OptimizerAsync = R6Class("OptimizerAsync",
inherit = Optimizer,
public = list(
#' @description
#' Performs the optimization on a [OptimInstanceAsyncSingleCrit] or [OptimInstanceAsyncMultiCrit] until termination.
#' The single evaluations will be written into the [ArchiveAsync].
#' The result will be written into the instance object.
#'
#' @param inst ([OptimInstanceAsyncSingleCrit] | [OptimInstanceAsyncMultiCrit]).
#'
#' @return [data.table::data.table()]
optimize = function(inst) {
optimize_async_default(inst, self)
}
),
private = list(
.xdt = NULL,
.ys = NULL
)
)
#' @title Default Asynchronous Optimization
#'
#' @description
#' Used internally in [OptimizerAsync].
#'
#' @param instance [OptimInstanceAsync].
#' @param optimizer [OptimizerAsync].
#' @param design [data.table::data.table()]\cr
#' (Initial) design send to the queue.
#' @param n_workers
#' Number of workers to be started.
#' Defaults to the number of workers set by [rush::rush_plan()].
#' @keywords internal
#' @export
optimize_async_default = function(instance, optimizer, design = NULL, n_workers = NULL) {
assert_class(instance, "OptimInstanceAsync")
assert_class(optimizer, "OptimizerAsync")
assert_data_table(design, null.ok = TRUE)
instance$archive$start_time = Sys.time()
get_private(instance)$.initialize_context(optimizer)
call_back("on_optimization_begin", instance$objective$callbacks, instance$objective$context)
# send design to workers
if (!is.null(design)) instance$archive$push_points(transpose_list(design))
if (getOption("bbotk_local", FALSE)) {
# debug mode runs .optimize() in main process
rush = rush::RushWorker$new(instance$rush$network_id, remote = FALSE)
instance$rush = rush
instance$archive$rush = rush
call_back("on_worker_begin", instance$objective$callbacks, instance$objective$context)
# run optimizer loop
get_private(optimizer)$.optimize(instance)
call_back("on_worker_end", instance$objective$callbacks, instance$objective$context)
} else {
# run .optimize() on workers
rush = instance$rush
# FIXME: How to pass globals and packages?
if (rush$n_pre_workers) {
# start remote workers
lg$info("Starting to optimize %i parameter(s) with '%s' and '%s' on %i remote worker(s)",
instance$search_space$length,
optimizer$format(),
instance$terminator$format(with_params = TRUE),
rush$n_pre_workers
)
rush$start_remote_workers(
worker_loop = bbotk_worker_loop,
packages = c(optimizer$packages, "bbotk"), # add packages from objective
optimizer = optimizer,
instance = instance)
} else if (rush::rush_available()) {
# local workers
lg$info("Starting to optimize %i parameter(s) with '%s' and '%s' on %i remote worker(s)",
instance$search_space$length,
optimizer$format(),
instance$terminator$format(with_params = TRUE),
rush::rush_config()$n_workers
)
rush$start_local_workers(
worker_loop = bbotk_worker_loop,
packages = c(optimizer$packages, "bbotk"), # add packages from objective
optimizer = optimizer,
instance = instance,
wait_for_workers = TRUE)
} else {
stop("No rush plan available to start local workers and no pre-started remote workers found. See `?rush::rush_plan()`.")
}
}
# wait until optimization is finished
# check terminated workers when the terminator is "none"
while(TRUE) {
Sys.sleep(1)
instance$rush$print_log()
# fetch new results for printing
new_results = instance$rush$fetch_new_tasks()
if (nrow(new_results)) {
lg$info("Results of %i configuration(s):", nrow(new_results))
lg$info(capture.output(print(new_results, class = FALSE, row.names = FALSE, print.keys = FALSE)))
}
if (instance$rush$all_workers_lost && !instance$is_terminated && !instance$rush$all_workers_terminated) {
stop("All workers have crashed.")
}
if (instance$is_terminated) break
if (instance$rush$all_workers_terminated) break
}
# assign result
get_private(optimizer)$.assign_result(instance)
lg$info("Finished optimizing after %i evaluation(s)", instance$archive$n_evals)
lg$info("Result:")
lg$info(capture.output(print(instance$result, lass = FALSE, row.names = FALSE, print.keys = FALSE)))
call_back("on_optimization_end", instance$objective$callbacks, instance$objective$context)
return(instance$result)
}