blob: a5d869af12845f19d898a4379191008fe69cd4dc [file] [log] [blame]
Sharon Topaz1fb830b2021-06-03 22:22:56 +03001# Copyright lowRISC contributors.
2# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3# SPDX-License-Identifier: Apache-2.0
4# ------------------------------------
5# SgeLauncher Class
6#
7# ------------------------------------
8import os
9import shlex
10import subprocess
11from subprocess import PIPE, Popen
12
13import SGE
14from Launcher import ErrorMessage, Launcher, LauncherError
15
16global job_name
17
18pid = os.getpid()
19
20
21class SgeLauncher(Launcher):
22 """
23 Implementation of Launcher to launch jobs in the user's local workstation.
24 """
25
26 # Misc common SgeLauncher settings.
27 max_odirs = 5
28
29 def __init__(self, deploy):
30 '''Initialize common class members.'''
31
32 super().__init__(deploy)
33
34 # Popen object when launching the job.
35 self.process = None
36
37 def _do_launch(self):
38 global job_name
39 # Update the shell's env vars with self.exports. Values in exports must
40 # replace the values in the shell's env vars if the keys match.
41 exports = os.environ.copy()
42 if self.deploy.exports:
43 exports.update(self.deploy.exports)
44
45 # Clear the magic MAKEFLAGS variable from exports if necessary. This
46 # variable is used by recursive Make calls to pass variables from one
47 # level to the next. Here, self.cmd is a call to Make but it's
48 # logically a top-level invocation: we don't want to pollute the flow's
49 # Makefile with Make variables from any wrapper that called dvsim.
50 if 'MAKEFLAGS' in exports:
51 del exports['MAKEFLAGS']
52
53 self._dump_env_vars(exports)
54
55 try:
56 f = open(self.deploy.get_log_path(),
57 "w",
58 encoding="UTF-8",
59 errors="surrogateescape")
60 f.write("[Executing]:\n{}\n\n".format(self.deploy.cmd))
61 f.flush()
62 # ---------- prepare SGE job struct -----
63 sgeJob = SGE.qsubOptions()
64 sgeJob.args.N = 'VCS_RUN_' + str(pid) # Name of Grid Engine job
65 if "build.log" in self.deploy.get_log_path():
66 sgeJob.args.N = 'VCS_BUILD_' + str(
67 pid) # Name of Grid Engine job
68
69 job_name = sgeJob.args.N
70 sgeJob.args.t = '0' # Define an array job with 20 subjobs
71 sgeJob.args.slot = '1' # Define num of slot
72 sgeJob.args.sync = 'y' # wait for job to complete before exiting
73 sgeJob.args.q = 'vcs_q' # Define the sge queue name
74 sgeJob.args.p = '0' # Set priority to 0
75 sgeJob.args.ll = 'mf=20G' # memory req,request the given resources
76 # pecifies a range of priorities from -1023 to 1024.
77 # The higher the number, the higher the priority.
78 # The default priority for jobs is zero
79 sgeJob.args.command = '"' + self.deploy.cmd + '"'
80 sgeJob.args.b = 'y' # This is a binary file
81 sgeJob.args.o = self.deploy.get_log_path() + '.sge'
82 cmd = str(sgeJob.execute(mode='echo'))
Sharon Topazff689db2022-10-12 11:58:35 +030083 print('INFO: SGE command line : "' + str(cmd) + '"')
Sharon Topaz1fb830b2021-06-03 22:22:56 +030084 # ---------------
85 self.process = subprocess.Popen(shlex.split(cmd),
86 bufsize=4096,
87 universal_newlines=True,
88 stdout=f,
89 stderr=f,
90 env=exports)
91 f.close()
92 except subprocess.SubprocessError as e:
93 raise LauncherError('IO Error: {}\nSee {}'.format(
94 e, self.deploy.get_log_path()))
95 finally:
96 self._close_process()
97
98 self._link_odir("D")
99 f.close()
100
101 def poll(self):
102 '''Check status of the running process
103
104 This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P',
105 the job finished successfully. If 'F', the job finished with an error.
106
107 This function must only be called after running self.dispatch_cmd() and
108 must not be called again once it has returned 'P' or 'F'.
109 '''
110
111 assert self.process is not None
112 if self.process.poll() is None:
113 return 'D'
114 # -------------------------------------
115 # copy SGE jobb results to log file
116 if os.path.exists(self.deploy.get_log_path() + '.sge'):
117
Sharon Topazaaad9d32022-10-20 20:09:31 +0300118 file1 = open(self.deploy.get_log_path() + '.sge', 'r', errors='replace')
Sharon Topaz1fb830b2021-06-03 22:22:56 +0300119 Lines = file1.readlines()
120 file1.close()
121 f = open(self.deploy.get_log_path(),
122 "a",
123 encoding="UTF-8",
124 errors="surrogateescape")
125 for line in Lines:
126 f.write(line)
127 f.flush()
128 os.remove(self.deploy.get_log_path() + '.sge')
129 f.close()
130 # -------------------------------------
131
132 self.exit_code = self.process.returncode
133 status, err_msg = self._check_status()
134 self._post_finish(status, err_msg)
135 return status
136
137 def kill(self):
138 global job_name
139 '''Kill the running process.
140
141 This must be called between dispatching and reaping the process (the
142 same window as poll()).
143
144 '''
145 assert self.process is not None
146
147 # Try to kill the running process. Send SIGTERM first, wait a bit,
148 # and then send SIGKILL if it didn't work.
149 self.process.terminate()
150 try:
151 self.process.wait(timeout=2)
152 except subprocess.TimeoutExpired:
153 self.process.kill()
154 # ----------------------------
155 # qdel -f kill sge job_name
156 cmd = 'qstatus -a | grep ' + job_name
157 with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process:
158 output = process.communicate()[0].decode("utf-8")
159 output = output.rstrip("\n")
160 if output != '':
161 output_l = output.split()
162 cmd = 'qdel ' + output_l[0]
163 with Popen(cmd, stdout=PIPE, stderr=None,
164 shell=True) as process:
165 output = process.communicate()[0].decode("utf-8")
166 output = output.rstrip("\n")
167 print('Killed job "' + str(output) + '"')
168 # ----------------------------
169 self._post_finish(
170 'K',
171 ErrorMessage(line_number=None, message='Job killed!', context=[]))
172
173 def _post_finish(self, status, err_msg):
174 super()._post_finish(status, err_msg)
175 self._close_process()
176 self.process = None
177
178 def _close_process(self):
179 '''Close the file descriptors associated with the process.'''
180
181 assert self.process
182 if self.process.stdout:
183 self.process.stdout.close()