blob: 1069ecb9f61ed55f6dfbb309c86fcac7d2acfe88 [file] [log] [blame]
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -08001# Copyright lowRISC contributors.
2# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3# SPDX-License-Identifier: Apache-2.0
4
5import logging as log
6import os
7import re
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -08008import subprocess
9import tarfile
10from pathlib import Path
11
Guillermo Maturana76caf9a2021-04-08 14:04:15 -070012from Launcher import ErrorMessage, Launcher, LauncherError
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080013from utils import VERBOSE, clean_odirs
14
15
16class LsfLauncher(Launcher):
17
18 # A hidden directory specific to a cfg, where we put individual 'job'
19 # scripts.
20 jobs_dir = {}
21
22 # All launcher instances available for lookup.
23 jobs = {}
24
25 # When the job completes, we try to read the job script output to determine
26 # the outcome. It may not have been completely written the first time we
27 # read it so we retry on the next poll, no more than 10 times.
28 max_poll_retries = 10
29
30 # TODO: Add support for build/run/cov job specific resource requirements:
31 # cpu, mem, disk, stack.
32 # TODO: Allow site-specific job resource usage setting using
33 # `DVSIM_LSF_CFG` environment variable.
34
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080035 @staticmethod
36 def prepare_workspace(project, repo_top, args):
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080037 # Since we dispatch to remote machines, a project specific python
38 # virtualenv is exists, needs to be activated when launching the job.
Srikrishna Iyer37b0c992021-03-23 16:27:26 -070039 Launcher.set_pyvenv(project)
40 if Launcher.pyvenv is None:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080041 return
42
Srikrishna Iyer37b0c992021-03-23 16:27:26 -070043 # If it is already a dir, then nothing to be done.
44 if os.path.isdir(Launcher.pyvenv):
45 return
46
47 # If not, then it needs to be a valid tarball. Extract it in the
48 # scratch area if it does not exist.
49 stem = Path(Launcher.pyvenv).stem
50 if stem.endswith("tar"):
51 stem = stem[:-4]
Srikrishna Iyer4829d282021-03-17 00:18:26 -070052 path = Path(args.scratch_root, stem)
53 if not path.is_dir():
Srikrishna Iyer37b0c992021-03-23 16:27:26 -070054 log.info("[prepare_workspace]: [pyvenv]: Extracting %s",
55 Launcher.pyvenv)
56 with tarfile.open(Launcher.pyvenv, mode='r') as tar:
57 tar.extractall(args.scratch_root)
58 log.info("[prepare_workspace]: [pyvenv]: Done: %s", path)
59 Launcher.pyvenv = path
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080060
61 @staticmethod
62 def prepare_workspace_for_cfg(cfg):
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080063 # Create the job dir.
64 LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf",
65 cfg.timestamp)
66 clean_odirs(odir=LsfLauncher.jobs_dir[cfg], max_odirs=2)
67 os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True)
68
69 @staticmethod
Srikrishna Iyer4829d282021-03-17 00:18:26 -070070 def make_job_script(cfg, job_name):
71 """Creates the job script.
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080072
Srikrishna Iyer4829d282021-03-17 00:18:26 -070073 Once all jobs in the array are launched, the job script can be created.
74 It is a bash script that takes the job index as a single argument.
75 This index is set in the bsub command as '$LSB_JOBINDEX', which bsub
76 sets as the actual index when launching that job in the array. This
77 script is super simple - it is just a giant case statement that
78 switches on the job index to run that specific job. This preferred over
79 creating individual scripts for each job which incurs additional file
80 I/O overhead when the scratch area is on NFS, causing a slowdown.
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080081
Srikrishna Iyer4829d282021-03-17 00:18:26 -070082 Returns the path to the job script.
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080083 """
84
85 lines = ["#!/usr/bin/env bash\nset -e\n"]
86
87 # Activate the python virtualenv if it exists.
Srikrishna Iyer37b0c992021-03-23 16:27:26 -070088 if Launcher.pyvenv:
89 lines += ["source {}/bin/activate\n".format(Launcher.pyvenv)]
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -080090
91 lines += ["case $1 in\n"]
92 for job in LsfLauncher.jobs[cfg][job_name]:
93 # Redirect the job's stdout and stderr to its log file.
94 cmd = "{} > {} 2>&1".format(job.deploy.cmd,
95 job.deploy.get_log_path())
96 lines += [" {})\n".format(job.index), " {};;\n".format(cmd)]
97
98 # Throw error as a sanity check if the job index is invalid.
99 lines += [
100 " *)\n",
101 " echo \"ERROR: Illegal job index: $1\" 1>&2; exit 1;;\n",
102 "esac\n"
103 ]
Srikrishna Iyer37b0c992021-03-23 16:27:26 -0700104 if Launcher.pyvenv:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800105 lines += ["deactivate\n"]
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800106
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700107 job_script = Path(LsfLauncher.jobs_dir[cfg], job_name)
108 try:
109 with open(job_script, "w", encoding='utf-8') as f:
110 f.writelines(lines)
111 except IOError as e:
112 err_msg = "ERROR: Failed to write {}:\n{}".format(job_script, e)
113 LsfLauncher._post_finish_job_array(cfg, job_name, err_msg)
114 raise LauncherError(err_msg)
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800115
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700116 log.log(VERBOSE, "[job_script]: %s", job_script)
117 return job_script
118
119 def __init__(self, deploy):
120 super().__init__(deploy)
121
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700122 # Maintain the job script output as an instance variable for polling
123 # and cleanup.
124 self.bsub_out = None
125
126 # If we already opened the job script output file (but have not
127 # determined the outcome), then we maintain the file descriptor rather
128 # then reopening it and starting all over again on the next poll.
129 self.bsub_out_fd = None
130 self.bsub_out_err_msg = []
131 self.bsub_out_err_msg_found = False
132
133 # Set the job id.
134 self.job_id = None
135
136 # Polling retry counter..
137 self.num_poll_retries = 0
138
139 # Add self to the list of jobs.
140 cfg_dict = LsfLauncher.jobs.setdefault(deploy.sim_cfg, {})
141 job_name_list = cfg_dict.setdefault(deploy.job_name, [])
142 job_name_list.append(self)
143
144 # Job's index in the array.
145 self.index = len(job_name_list)
146
147 def _do_launch(self):
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800148 # Add self to the list of jobs.
149 job_name = self.deploy.job_name
150 cfg = self.deploy.sim_cfg
151 job_total = len(LsfLauncher.jobs[cfg][job_name])
152
153 # The actual launching of the bsub command cannot happen until the
154 # Scheduler has dispatched ALL jobs in the array.
155 if self.index < job_total:
156 return
157
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700158 job_script = self.make_job_script(cfg, job_name)
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800159
160 # Update the shell's env vars with self.exports. Values in exports must
161 # replace the values in the shell's env vars if the keys match.
162 exports = os.environ.copy()
163 if self.deploy.exports:
164 exports.update(self.deploy.exports)
165
166 # Clear the magic MAKEFLAGS variable from exports if necessary. This
167 # variable is used by recursive Make calls to pass variables from one
168 # level to the next. Here, self.cmd is a call to Make but it's
169 # logically a top-level invocation: we don't want to pollute the flow's
170 # Makefile with Make variables from any wrapper that called dvsim.
171 if 'MAKEFLAGS' in exports:
172 del exports['MAKEFLAGS']
173
174 self._dump_env_vars(exports)
175
176 # TODO: Arbitrarily set the max slot-limit to 100.
177 job_array = "{}[1-{}]".format(job_name, job_total)
178 if job_total > 100:
179 job_array += "%100"
180
181 # TODO: This needs to be moved to a HJson.
182 if self.deploy.sim_cfg.tool == "vcs":
183 job_rusage = "\'rusage[vcssim=1,vcssim_dynamic=1:duration=1]\'"
184
185 elif self.deploy.sim_cfg.tool == "xcelium":
186 job_rusage = "\'rusage[xcelium=1,xcelium_dynamic=1:duration=1]\'"
187
188 else:
189 job_rusage = None
190
191 # Launch the job array.
192 cmd = [
193 "bsub",
194 # TODO: LSF project name could be site specific!
195 "-P",
196 cfg.project,
197 "-J",
198 job_array,
199 "-oo",
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700200 "{}.%I.out".format(job_script),
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800201 "-eo",
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700202 "{}.%I.out".format(job_script)
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800203 ]
204
205 if job_rusage:
206 cmd += ["-R", job_rusage]
207
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700208 cmd += ["/usr/bin/bash {} $LSB_JOBINDEX".format(job_script)]
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800209
210 try:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700211 p = subprocess.run(cmd,
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800212 check=True,
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800213 timeout=60,
214 stdout=subprocess.PIPE,
215 stderr=subprocess.PIPE,
216 env=exports)
217 except subprocess.CalledProcessError as e:
218 # Need to mark all jobs in this range with this fail pattern.
219 err_msg = e.stderr.decode("utf-8").strip()
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700220 self._post_finish_job_array(cfg, job_name, err_msg)
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800221 raise LauncherError(err_msg)
222
223 # Extract the job ID.
224 result = p.stdout.decode("utf-8").strip()
225 job_id = result.split('Job <')[1].split('>')[0]
226 if not job_id:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700227 self._post_finish_job_array(cfg, job_name, "Job ID not found!")
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800228 raise LauncherError(err_msg)
229
230 for job in LsfLauncher.jobs[cfg][job_name]:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700231 job.bsub_out = Path("{}.{}.out".format(job_script, job.index))
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800232 job.job_id = "{}[{}]".format(job_id, job.index)
233 job._link_odir("D")
234
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800235 def poll(self):
236 # It is possible we may have determined the status already.
237 if self.status:
238 return self.status
239
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700240 if not self.bsub_out_fd:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800241 # If job id is not set, the bsub command has not been sent yet.
242 if not self.job_id:
243 return 'D'
244
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800245 # We redirect the job's output to the log file, so the job script
246 # output remains empty until the point it finishes. This is a very
247 # quick way to check if the job has completed. If nothing has been
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700248 # written to the job script output yet (or if it is not yet
249 # created), then the job is still running.
250 try:
251 if not self.bsub_out.stat().st_size:
252 return "D"
253 except FileNotFoundError:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800254 return "D"
255
256 # If we got to this point, we can now open the job script output
257 # file for reading.
258 try:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700259 self.bsub_out_fd = open(self.bsub_out, "r")
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800260 except IOError as e:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700261 self._post_finish(
262 "F",
Guillermo Maturana76caf9a2021-04-08 14:04:15 -0700263 ErrorMessage(
264 line_number=None,
265 message="ERROR: Failed to open {}\n{}.".format(
266 self.bsub_out, e),
267 context=[]))
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700268 return "F"
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800269
270 # Now that the job has completed, we need to determine its status.
271 #
272 # If the job successfully launched and it failed, the failure message
273 # will appear in its log file (because of the stderr redirection).
274 # But, in some cases, if there is something wrong with the command
275 # itself, bsub might return immediately with an error message, which
276 # will appear in the job script output file. We want to retrieve that
277 # so that we can report the status accurately.
278 #
279 # At this point, we could run bjobs or bhist to determine the status,
280 # but it has been found to be too slow, expecially when running 1000s
281 # of jobs. Plus, we have to read the job script output anyway to look
282 # for those error messages.
283 #
284 # So we just read this file to determine both, the status and extract
285 # the error message, rather than running bjobs or bhist. But there is
286 # one more complication to deal with - if we read the file now, it is
287 # possible that it may not have been fully updated. We will try reading
288 # it anyway. If we are unable to find what we are looking for, then we
289 # will resume reading it again at the next poll. We will do this upto
290 # max_poll_retries times before giving up and flagging an error.
291 #
292 # TODO: Consider using the IBM Plarform LSF Python APIs instead.
293 # (deferred due to shortage of time / resources).
294 # TODO: Parse job telemetry data for performance insights.
295
296 exit_code = self._get_job_exit_code()
297 if exit_code is not None:
298 self.exit_code = exit_code
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700299 status, err_msg = self._check_status()
300 # Prioritize error messages from bsub over the job's log file.
301 if self.bsub_out_err_msg:
Guillermo Maturana76caf9a2021-04-08 14:04:15 -0700302 err_msg = ErrorMessage(line_number=None,
303 message=self.bsub_out_err_msg,
304 context=[])
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700305 self._post_finish(status, err_msg)
306 return status
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800307
308 else:
309 self.num_poll_retries += 1
310 # Fail the test if we have reached the max polling retries.
311 if self.num_poll_retries == LsfLauncher.max_poll_retries:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700312 self._post_finish(
313 "F", "ERROR: Reached max retries while "
314 "reading job script output {} to determine"
315 " the outcome.".format(self.bsub_out))
316 return "F"
317
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800318 return "D"
319
320 def _get_job_exit_code(self):
321 '''Read the job script output to retrieve the exit code.
322
323 Also read the error message if any, which will appear at the beginning
324 of the log file followed by bsub's standard 'email' format output. It
325 looks something like this:
326
327 <stderr messages>
328 ------------------------------------------------------------
329 Sender: LSF System <...>
330 Subject: ...
331 ...
332
333 Successfully completed.
334 <OR>
335 Exited with exit code 1.
336
337 ...
338
339 The extracted stderr messages are logged to self.fail_msg. The line
340 indicating whether it was successful or it failed with an exit code
341 is used to return the exit code.
342
343 Returns the exit code if found, else None.
344 '''
345
346 # Job script output must have been opened already.
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700347 assert self.bsub_out_fd
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800348
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700349 for line in self.bsub_out_fd:
350 if not self.bsub_out_err_msg_found:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800351 m = re.match("^Sender", line)
352 if m:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700353 # Pop the line before the sender line.
354 self.bsub_out_err_msg = "\n".join(
355 self.bsub_out_err_msg[:-1])
356 self.bsub_out_err_msg_found = True
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800357 else:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700358 self.bsub_out_err_msg.append(line.strip())
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800359
360 else:
361 m = re.match(r"^Exited with exit code (\d+).\n$", line)
362 if m:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800363 return m.group(1)
364
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700365 if not self.bsub_out_err_msg:
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800366 m = re.match(r"^Successfully completed.\n$", line)
367 if m:
368 return 0
369 return None
370
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800371 def kill(self):
372 if self.job_id:
373 try:
374 subprocess.run(["bkill", "-s", "SIGTERM", self.job_id],
375 check=True,
376 stdout=subprocess.PIPE,
377 stderr=subprocess.PIPE)
378 except subprocess.CalledProcessError as e:
379 log.error("Failed to kill job: {}".format(
380 e.stderr.decode("utf-8").strip()))
381 else:
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700382 log.error("Job ID for %s not found", self.deploy.full_name)
Srikrishna Iyer9d9d86f2021-03-02 00:15:51 -0800383
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700384 self._post_finish('K', "Job killed!")
385
386 def _post_finish(self, status, err_msg):
387 if self.bsub_out_fd:
388 self.bsub_out_fd.close()
Srikrishna Iyer4829d282021-03-17 00:18:26 -0700389 if self.exit_code is None:
390 self.exit_code = 0 if status == 'P' else 1
391 super()._post_finish(status, err_msg)
392
393 @staticmethod
394 def _post_finish_job_array(cfg, job_name, err_msg):
395 '''On LSF error, mark all jobs in this array as killed.
396
397 err_msg is the error message indicating the cause of failure.'''
398
399 for job in LsfLauncher.jobs[cfg][job_name]:
Guillermo Maturana76caf9a2021-04-08 14:04:15 -0700400 job._post_finish("F", ErrorMessage(line_number=None,
401 message=err_msg,
402 context=[]))