Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ config:
user: matt # ssh login user
working_dir: 'c:\temp' # working folder on remote host, required for streaming type
ffmpeg: 'c:/ffmpeg/bin/ffmpeg'
remote_copy_cmd: 'scp -T' # command used to copy to and from remote machine
profiles: # profiles allowed on this host
- hevc_cuda
- hevc_qsv
Expand Down Expand Up @@ -175,6 +176,7 @@ config:
| user | User to log into this host as via ssh. The user must be pre-authenticated to the host so that a password is not required. See https://www.ssh.com/ssh/copy-id. **[1]** |
| ffmpeg | Path on the host to ffmpeg. |
| working_dir | Indicates the temporary directory to use for encoding. **[2]** |
| remote_copy_cmd | Optional. Program used to copy files to and from the remote machine. Defaults to 'scp -T'. Values starting with '[' are interpreted as a python list of arguments (e.g. '["scp", "-T"]'), to allow for program path with spaces. Program must support the scp syntax (user@remote:/path/file) in both directions. Default value probably does not work with openssh versions older than January 2019. One alternative is 'rsync -e ssh'. |
| profiles | The allowed profiles to use for all encodes on this host. If not provided, assumes all. A video input matching a profile that is not assigned to a particular host will be run on a host that will, if any. This is how, for example, you restrict CPU-based encodings to hosts with no hardware acceleration - or vice versa. In other words, you control how each host is used by which profiles it supports. |
| path-substitutions | Optional. Applicable only to *mounted* type hosts. Use when the server media files and host mount paths are different. |
| queues | Optional. You can define per-host queues to enable concurrent jobs on each host. If not given, encoding jobs will run 1 at a time. See README.md for further discussion of queues. |
Expand Down
44 changes: 33 additions & 11 deletions pytranscoder/cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Cluster support
"""
import ast
import datetime
import os
import shutil
Expand Down Expand Up @@ -58,6 +59,22 @@ def working_dir(self):
def host_type(self):
return self.props['type']

@property
def remote_copy_cmd(self) -> [str]:
if 'remote_copy_cmd' not in self.props:
return ['scp', '-T']

if self.props['remote_copy_cmd'][0] == '[':
try:
cmd = ast.literal_eval(self.props['remote_copy_cmd'])
except SyntaxError:
print(f'failed to parse {self.props["remote_copy_cmd"]} as python list')
sys.exit(1)
else:
cmd = self.props['remote_copy_cmd'].split()

return cmd

def get_processor(self) -> Processor:
# match first available processor (for info parsing use only)
if self.ffmpeg_path:
Expand Down Expand Up @@ -333,13 +350,15 @@ def go(self):
if _profile.is_ffmpeg:
if job.media_info.is_multistream() and self.configfile.automap and _profile.automap:
ooutput = ooutput + job.media_info.ffmpeg_streams(_profile)
cmd = ['-y', *oinput, '-i', self.converted_path(remote_inpath),
*ooutput, self.converted_path(remote_outpath)]
cmd = ['-y', *oinput, '-i', f'"{self.converted_path(remote_inpath)}"',
*ooutput, f'"{self.converted_path(remote_outpath)}"']
processor_cmd = self.props.ffmpeg_path
else:
cmd = ['-i', self.converted_path(remote_inpath), *oinput,
*ooutput, '-o', self.converted_path(remote_outpath)]
cmd = ['-i', f'"{self.converted_path(remote_inpath)}"', *oinput,
*ooutput, '-o', f'"{self.converted_path(remote_outpath)}']
processor_cmd = self.props.hbcli_path

cli = [*ssh_cmd, *cmd]
cli = [*ssh_cmd, processor_cmd, *cmd]

#
# display useful information
Expand All @@ -365,7 +384,9 @@ def go(self):
# trick to make scp work on the Windows side
target_dir = '/' + remote_working_dir

scp = ['scp', inpath, self.props.user + '@' + self.props.ip + ':' + target_dir]
cp_cmd = self.props.remote_copy_cmd

scp = [*cp_cmd, inpath, self.props.user + '@' + self.props.ip + ':"' + target_dir + '"']
self.log(' '.join(scp))

code, output = run(scp)
Expand Down Expand Up @@ -410,7 +431,8 @@ def hb_log_callback(stats):
# copy results back to local
#
retrieved_copy_name = os.path.join(gettempdir(), os.path.basename(remote_outpath))
cmd = ['scp', self.props.user + '@' + self.props.ip + ':' + remote_outpath, retrieved_copy_name]
cmd = [*cp_cmd, self.props.user + '@' + self.props.ip + ':"' + remote_outpath + '"',
retrieved_copy_name]
self.log(' '.join(cmd))

code, output = run(cmd)
Expand Down Expand Up @@ -443,11 +465,11 @@ def hb_log_callback(stats):
if self.props.is_windows():
remote_outpath = self.converted_path(remote_outpath)
remote_inpath = self.converted_path(remote_inpath)
self.run_process([*ssh_cmd, f'"del {remote_outpath}"'])
self.run_process([*ssh_cmd, f'"del {remote_inpath}"'])
self.run_process([*ssh_cmd, 'del', f'"{remote_outpath}"'])
self.run_process([*ssh_cmd, 'del', f'"{remote_inpath}"'])
else:
self.run_process([*ssh_cmd, f'"rm {remote_outpath}"'])
self.run_process([*ssh_cmd, f'"rm {remote_inpath}"'])
self.run_process([*ssh_cmd, 'rm', f'"{remote_outpath}"'])
self.run_process([*ssh_cmd, 'rm', f'"{remote_inpath}"'])

finally:
self.queue.task_done()
Expand Down