#!/bin/sh # -*- coding: utf-8 -*- # pylint: disable=line-too-long # pylint: disable=too-many-statements # pylint: disable=deprecated-module # $Id: vboxshell.py 106061 2024-09-16 14:03:52Z vboxsync $ # The following checks for the right (i.e. most recent) Python binary available # and re-starts the script using that binary (like a shell wrapper). # # Using a shebang like "#!/bin/env python" on newer Fedora/Debian distros is banned [1] # and also won't work on other newer distros (Ubuntu >= 23.10), as those only ship # python3 without a python->python3 symlink anymore. # # Note: As Python 2 is EOL, we consider this last (and hope for the best). # # [1] https://lists.fedoraproject.org/archives/list/devel@lists.fedoraproject.org/message/2PD5RNJRKPN2DVTNGJSBHR5RUSVZSDZI/ ''':' for python_bin in python3 python python2 do type "$python_bin" > /dev/null 2>&1 && exec "$python_bin" "$0" "$@" done echo >&2 "ERROR: Python not found! Please install this first in order to run this program." exit 1 ':''' from __future__ import print_function # VirtualBox Python Shell. # # This program is a simple interactive shell for VirtualBox. You can query # information and issue commands from a simple command line. # # It also provides you with examples on how to use VirtualBox's Python API. # This shell is even somewhat documented, supports TAB-completion and # history if you have Python readline installed. # # Finally, shell allows arbitrary custom extensions, just create # .VirtualBox/shexts/ and drop your extensions there. # Enjoy. # # P.S. Our apologies for the code quality. __copyright__ = \ """ Copyright (C) 2009-2024 Oracle and/or its affiliates. This file is part of VirtualBox base platform packages, as available from https://www.virtualbox.org. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, in version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . SPDX-License-Identifier: GPL-3.0-only """ __version__ = "$Revision: 106061 $" import gc import os import sys import traceback import shlex import tempfile import time import re import platform from optparse import OptionParser # # Global Variables # g_fBatchMode = False g_sScriptFile = None g_sCmd = None g_fHasReadline = True try: import readline import rlcompleter except ImportError: g_fHasReadline = False g_sPrompt = "vbox> " g_fHasColors = True g_dTermColors = { 'red': '\033[31m', 'blue': '\033[94m', 'green': '\033[92m', 'yellow': '\033[93m', 'magenta': '\033[35m', 'cyan': '\033[36m' } def colored(strg, color): """ Translates a string to one including coloring settings, if enabled. """ if not g_fHasColors: return strg col = g_dTermColors.get(color, None) if col: return col+str(strg)+'\033[0m' return strg if g_fHasReadline: class CompleterNG(rlcompleter.Completer): def __init__(self, dic, ctx): self.ctx = ctx rlcompleter.Completer.__init__(self, dic) def complete(self, text, state): """ taken from: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/496812 """ if text == "": return ['\t', None][state] return rlcompleter.Completer.complete(self, text, state) def canBePath(self, _phrase, word): return word.startswith('/') def canBeCommand(self, phrase, _word): spaceIdx = phrase.find(" ") begIdx = readline.get_begidx() firstWord = (spaceIdx == -1 or begIdx < spaceIdx) if firstWord: return True if phrase.startswith('help'): return True return False def canBeMachine(self, phrase, word): return not self.canBePath(phrase, word) and not self.canBeCommand(phrase, word) def global_matches(self, text): """ Compute matches when text is a simple name. Return a list of all names currently defined in self.namespace that match. """ matches = [] phrase = readline.get_line_buffer() try: if self.canBePath(phrase, text): (directory, rest) = os.path.split(text) c = len(rest) for word in os.listdir(directory): if c == 0 or word[:c] == rest: matches.append(os.path.join(directory, word)) if self.canBeCommand(phrase, text): c = len(text) for lst in [ self.namespace ]: for word in lst: if word[:c] == text: matches.append(word) if self.canBeMachine(phrase, text): c = len(text) for mach in getMachines(self.ctx, False, True): # although it has autoconversion, we need to cast # explicitly for subscripts to work word = re.sub("(?' self.id = mach.id # pylint: disable=invalid-name def cacheMachines(_ctx, lst): result = [] for mach in lst: elem = CachedMach(mach) result.append(elem) return result def getMachines(ctx, invalidate = False, simple=False): if ctx['vb'] is not None: if ctx['_machlist'] is None or invalidate: ctx['_machlist'] = ctx['global'].getArray(ctx['vb'], 'machines') ctx['_machlistsimple'] = cacheMachines(ctx, ctx['_machlist']) if simple: return ctx['_machlistsimple'] return ctx['_machlist'] return [] def asState(var): if var: return colored('on', 'green') return colored('off', 'green') def asFlag(var): if var: return 'yes' return 'no' def getFacilityStatus(ctx, guest, facilityType): (status, _timestamp) = guest.getFacilityStatus(facilityType) return asEnumElem(ctx, 'AdditionsFacilityStatus', status) def perfStats(ctx, mach): if not ctx['perf']: return for metric in ctx['perf'].query(["*"], [mach]): print(metric['name'], metric['values_as_string']) def guestExec(_ctx, _machine, _console, cmds): exec(cmds) # pylint: disable=exec-used def printMouseEvent(_ctx, mev): print("Mouse: mode=%d x=%d y=%d z=%d w=%d buttons=%x" % (mev.mode, mev.x, mev.y, mev.z, mev.w, mev.buttons)) def printKbdEvent(ctx, kev): print("Kbd: ", ctx['global'].getArray(kev, 'scancodes')) def printMultiTouchEvent(ctx, mtev): print("MultiTouch: %s contacts=%d time=%d" \ % ("touchscreen" if mtev.isTouchScreen else "touchpad", mtev.contactCount, mtev.scanTime)) xPositions = ctx['global'].getArray(mtev, 'xPositions') yPositions = ctx['global'].getArray(mtev, 'yPositions') contactIds = ctx['global'].getArray(mtev, 'contactIds') contactFlags = ctx['global'].getArray(mtev, 'contactFlags') for i in range(0, mtev.contactCount): print(" [%d] %d,%d %d %d" % (i, xPositions[i], yPositions[i], contactIds[i], contactFlags[i])) def monitorSource(ctx, eventSource, active, dur): def handleEventImpl(event): evtype = event.type print("got event: %s %s" % (str(evtype), asEnumElem(ctx, 'VBoxEventType', evtype))) if evtype == ctx['global'].constants.VBoxEventType_OnMachineStateChanged: scev = ctx['global'].queryInterface(event, 'IMachineStateChangedEvent') if scev: print("machine state event: mach=%s state=%s" % (scev.machineId, scev.state)) elif evtype == ctx['global'].constants.VBoxEventType_OnSnapshotTaken: stev = ctx['global'].queryInterface(event, 'ISnapshotTakenEvent') if stev: print("snapshot taken event: mach=%s snap=%s" % (stev.machineId, stev.snapshotId)) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestPropertyChanged: gpcev = ctx['global'].queryInterface(event, 'IGuestPropertyChangedEvent') if gpcev: if gpcev.fWasDeleted is True: print("property %s was deleted" % (gpcev.name)) else: print("guest property change: name=%s value=%s flags='%s'" % (gpcev.name, gpcev.value, gpcev.flags)) elif evtype == ctx['global'].constants.VBoxEventType_OnMousePointerShapeChanged: psev = ctx['global'].queryInterface(event, 'IMousePointerShapeChangedEvent') if psev: shape = ctx['global'].getArray(psev, 'shape') if shape is None: print("pointer shape event - empty shape") else: print("pointer shape event: w=%d h=%d shape len=%d" % (psev.width, psev.height, len(shape))) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestMouse: mev = ctx['global'].queryInterface(event, 'IGuestMouseEvent') if mev: printMouseEvent(ctx, mev) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestKeyboard: kev = ctx['global'].queryInterface(event, 'IGuestKeyboardEvent') if kev: printKbdEvent(ctx, kev) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestMultiTouch: mtev = ctx['global'].queryInterface(event, 'IGuestMultiTouchEvent') if mtev: printMultiTouchEvent(ctx, mtev) class EventListener(object): def __init__(self, arg): pass def handleEvent(self, event): try: # a bit convoluted QI to make it work with MS COM handleEventImpl(ctx['global'].queryInterface(event, 'IEvent')) except: traceback.print_exc() if active: listener = ctx['global'].createListener(EventListener) else: listener = eventSource.createListener() registered = False if dur == -1: # not infinity, but close enough dur = 100000 try: eventSource.registerListener(listener, [ctx['global'].constants.VBoxEventType_Any], active) registered = True end = time.time() + dur while time.time() < end: if active: ctx['global'].waitForEvents(500) else: event = eventSource.getEvent(listener, 500) if event: handleEventImpl(event) # otherwise waitable events will leak (active listeners ACK automatically) eventSource.eventProcessed(listener, event) # We need to catch all exceptions here, otherwise listener will never be unregistered except: traceback.print_exc() if listener and registered: eventSource.unregisterListener(listener) g_tsLast = 0 def recordDemo(ctx, console, filename, dur): global g_tsLast g_tsLast = time.time() def stamp(): global g_tsLast tsCur = time.time() timePassed = int((tsCur-g_tsLast)*1000) g_tsLast = tsCur return timePassed def handleEventImpl(event): evtype = event.type #print("got event: %s %s" % (str(evtype), asEnumElem(ctx, 'VBoxEventType', evtype))) if evtype == ctx['global'].constants.VBoxEventType_OnGuestMouse: mev = ctx['global'].queryInterface(event, 'IGuestMouseEvent') if mev: line = "%d: m %d %d %d %d %d %d\n" % (stamp(), mev.mode, mev.x, mev.y, mev.z, mev.w, mev.buttons) demo.write(line) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestKeyboard: kev = ctx['global'].queryInterface(event, 'IGuestKeyboardEvent') if kev: line = "%d: k %s\n" % (stamp(), str(ctx['global'].getArray(kev, 'scancodes'))) demo.write(line) listener = console.eventSource.createListener() registered = False # we create an aggregated event source to listen for multiple event sources (keyboard and mouse in our case) agg = console.eventSource.createAggregator([console.keyboard.eventSource, console.mouse.eventSource]) with open(filename, 'w', encoding='utf-8') as demo: header = "VM=" + console.machine.name + "\n" demo.write(header) if dur == -1: # not infinity, but close enough dur = 100000 try: agg.registerListener(listener, [ctx['global'].constants.VBoxEventType_Any], False) registered = True end = time.time() + dur while time.time() < end: event = agg.getEvent(listener, 1000) if event: handleEventImpl(event) # keyboard/mouse events aren't waitable, so no need for eventProcessed # We need to catch all exceptions here, otherwise listener will never be unregistered except: traceback.print_exc() demo.close() if listener and registered: agg.unregisterListener(listener) def playbackDemo(ctx, console, filename, dur): if dur == -1: # not infinity, but close enough dur = 100000 with open(filename, 'r', encoding='utf-8') as demo: header = demo.readline() if g_fVerbose: print("Header is", header) basere = re.compile(r'(?P\d+): (?P[km]) (?P

.*)') mre = re.compile(r'(?P\d+) (?P-*\d+) (?P-*\d+) (?P-*\d+) (?P-*\d+) (?P-*\d+)') kre = re.compile(r'\d+') kbd = console.keyboard mouse = console.mouse try: end = time.time() + dur for line in demo: if time.time() > end: break match = basere.search(line) if match is None: continue rdict = match.groupdict() stamp = rdict['s'] params = rdict['p'] rtype = rdict['t'] time.sleep(float(stamp)/1000) if rtype == 'k': codes = kre.findall(params) if g_fVerbose: print("KBD:", codes) kbd.putScancodes(codes) elif rtype == 'm': mouseEvent = mre.search(params) if mouseEvent is not None: mdict = mouseEvent.groupdict() if mdict['a'] == '1': if g_fVerbose: print("MA: ", mdict['x'], mdict['y'], mdict['z'], mdict['b']) mouse.putMouseEventAbsolute(int(mdict['x']), int(mdict['y']), int(mdict['z']), int(mdict['w']), int(mdict['b'])) else: if g_fVerbose: print("MR: ", mdict['x'], mdict['y'], mdict['b']) mouse.putMouseEvent(int(mdict['x']), int(mdict['y']), int(mdict['z']), int(mdict['w']), int(mdict['b'])) # We need to catch all exceptions here, to close file except KeyboardInterrupt: ctx['interrupt'] = True except: traceback.print_exc() demo.close() def takeScreenshot(ctx, console, args): display = console.display if len(args) > 0: filename = args[0] else: filename = os.path.join(tempfile.gettempdir(), "screenshot.png") if len(args) > 3: screen = int(args[3]) else: screen = 0 (fbw, fbh, _fbbpp, _fbx, _fby, _) = display.getScreenResolution(screen) if len(args) > 1: width = int(args[1]) else: width = fbw if len(args) > 2: height = int(args[2]) else: height = fbh print("Saving screenshot (%d x %d) screen %d in %s..." % (width, height, screen, filename)) data = display.takeScreenShotToArray(screen, width, height, ctx['const'].BitmapFormat_PNG) with open(filename, 'wb') as pngfile: pngfile.write(data) pngfile.close() def teleport(ctx, _session, console, args): if args[0].find(":") == -1: print("Use host:port format for teleport target") return (host, port) = args[0].split(":") if len(args) > 1: passwd = args[1] else: passwd = "" if len(args) > 2: maxDowntime = int(args[2]) else: maxDowntime = 250 port = int(port) print("Teleporting to %s:%d..." % (host, port)) progress = console.teleport(host, port, passwd, maxDowntime) if progressBar(ctx, progress, 100) and int(progress.resultCode) == 0: print("Success!") else: reportError(ctx, progress) def guestStats(ctx, console, args): guest = console.guest if not guest: print("Guest is not in a running state") return # we need to set up guest statistics if len(args) > 0 : update = args[0] else: update = 1 if guest.statisticsUpdateInterval != update: guest.statisticsUpdateInterval = update try: time.sleep(float(update)+0.1) except: # to allow sleep interruption pass all_stats = ctx['const'].all_values('GuestStatisticType') cpu = 0 for s in list(all_stats.keys()): try: val = guest.getStatistic( cpu, all_stats[s]) print("%s: %d" % (s, val)) except: # likely not implemented pass def plugCpu(_ctx, machine, _session, args): cpu = int(args[0]) print("Adding CPU %d..." % (cpu)) machine.hotPlugCPU(cpu) def unplugCpu(_ctx, machine, _session, args): cpu = int(args[0]) print("Removing CPU %d..." % (cpu)) machine.hotUnplugCPU(cpu) def mountIso(_ctx, machine, _session, args): machine.mountMedium(args[0], args[1], args[2], args[3], args[4]) machine.saveSettings() def cond(condToCheck, resTrue, resFalse): if condToCheck: return resTrue return resFalse def printHostUsbDev(ctx, usbdev): print(" %s: %s (vendorId=%d productId=%d serial=%s) %s" \ % (usbdev.id, colored(usbdev.product, 'blue'), usbdev.vendorId, usbdev.productId, usbdev.serialNumber, asEnumElem(ctx, 'USBDeviceState', usbdev.state))) def printUsbDev(_ctx, usbdev): print(" %s: %s (vendorId=%d productId=%d serial=%s)" \ % (usbdev.id, colored(usbdev.product, 'blue'), usbdev.vendorId, usbdev.productId, usbdev.serialNumber)) def printSf(ctx, sharedfolder): print(" name=%s host=%s %s %s" \ % (sharedfolder.name, colPath(ctx, sharedfolder.hostPath), cond(sharedfolder.accessible, "accessible", "not accessible"), cond(sharedfolder.writable, "writable", "read-only"))) def ginfo(ctx, console, _args): guest = console.guest if not guest: print("Guest is not in a running state") return if guest.additionsRunLevel != ctx['const'].AdditionsRunLevelType_None: print("Additions active, version %s" % (guest.additionsVersion)) print("Support seamless: %s" % (getFacilityStatus(ctx, guest, ctx['const'].AdditionsFacilityType_Seamless))) print("Support graphics: %s" % (getFacilityStatus(ctx, guest, ctx['const'].AdditionsFacilityType_Graphics))) print("Balloon size: %d" % (guest.memoryBalloonSize)) print("Statistic update interval: %d" % (guest.statisticsUpdateInterval)) else: print("No additions") usbs = ctx['global'].getArray(console, 'USBDevices') print("Attached USB:") for usbdev in usbs: printUsbDev(ctx, usbdev) rusbs = ctx['global'].getArray(console, 'remoteUSBDevices') print("Remote USB:") for usbdev in rusbs: printHostUsbDev(ctx, usbdev) print("Transient shared folders:") sfs = rusbs = ctx['global'].getArray(console, 'sharedFolders') for sharedfolder in sfs: printSf(ctx, sharedfolder) def cmdExistingVm(ctx, mach, cmd, args): session = None try: session = ctx['global'].openMachineSession(mach, fPermitSharing=True) except Exception as e: printErr(ctx, "Session to '%s' not open: %s" % (mach.name, str(e))) if g_fVerbose: traceback.print_exc() return if session.state != ctx['const'].SessionState_Locked: print("Session to '%s' in wrong state: %s" % (mach.name, session.state)) session.unlockMachine() return # this could be an example how to handle local only (i.e. unavailable # in Webservices) functionality if ctx['remote'] and cmd == 'some_local_only_command': print('Trying to use local only functionality, ignored') session.unlockMachine() return console = session.console ops = {'pause': console.pause(), 'resume': console.resume(), 'powerdown': console.powerDown(), 'powerbutton': console.powerButton(), 'stats': lambda: perfStats(ctx, mach), 'guest': lambda: guestExec(ctx, mach, console, args), 'ginfo': lambda: ginfo(ctx, console, args), 'guestlambda': lambda: args[0](ctx, mach, console, args[1:]), 'save': lambda: progressBar(ctx, session.machine.saveState()), 'screenshot': lambda: takeScreenshot(ctx, console, args), 'teleport': lambda: teleport(ctx, session, console, args), 'gueststats': lambda: guestStats(ctx, console, args), 'plugcpu': lambda: plugCpu(ctx, session.machine, session, args), 'unplugcpu': lambda: unplugCpu(ctx, session.machine, session, args), 'mountiso': lambda: mountIso(ctx, session.machine, session, args), } try: ops[cmd]() except KeyboardInterrupt: ctx['interrupt'] = True except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() session.unlockMachine() def cmdClosedVm(ctx, mach, cmd, args=None, save=True): session = ctx['global'].openMachineSession(mach, fPermitSharing=True) mach = session.machine try: cmd(ctx, mach, args) except Exception as e: save = False printErr(ctx, e) if g_fVerbose: traceback.print_exc() if save: try: mach.saveSettings() except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() ctx['global'].closeMachineSession(session) def cmdAnyVm(ctx, mach, cmd, args=None, save=False): session = ctx['global'].openMachineSession(mach, fPermitSharing=True) mach = session.machine try: cmd(ctx, mach, session.console, args) except Exception as e: save = False printErr(ctx, e) if g_fVerbose: traceback.print_exc() if save: mach.saveSettings() ctx['global'].closeMachineSession(session) def machById(ctx, uuid): mach = ctx['vb'].findMachine(uuid) return mach class XPathNode: def __init__(self, parent, obj, ntype): self.parent = parent self.obj = obj self.ntype = ntype def lookup(self, subpath): children = self.enum() matches = [] for e in children: if e.matches(subpath): matches.append(e) return matches def enum(self): return [] def matches(self, subexp): if subexp == self.ntype: return True if not subexp.startswith(self.ntype): return False match = re.search(r"@(?P\w+)=(?P[^\'\[\]]+)", subexp) matches = False try: if match is not None: xdict = match.groupdict() attr = xdict['a'] val = xdict['v'] matches = str(getattr(self.obj, attr)) == val except: pass return matches def apply(self, cmd): exec(cmd, {'obj':self.obj, 'node':self, 'ctx':self.getCtx()}, {}) # pylint: disable=exec-used def getCtx(self): if hasattr(self, 'ctx'): return self.ctx return self.parent.getCtx() class XPathNodeHolder(XPathNode): def __init__(self, parent, obj, attr, heldClass, xpathname): XPathNode.__init__(self, parent, obj, 'hld '+xpathname) self.attr = attr self.heldClass = heldClass self.xpathname = xpathname def enum(self): children = [] for node in self.getCtx()['global'].getArray(self.obj, self.attr): nodexml = self.heldClass(self, node) children.append(nodexml) return children def matches(self, subexp): return subexp == self.xpathname class XPathNodeValue(XPathNode): def __init__(self, parent, obj, xpathname): XPathNode.__init__(self, parent, obj, 'val '+xpathname) self.xpathname = xpathname def matches(self, subexp): return subexp == self.xpathname class XPathNodeHolderVM(XPathNodeHolder): def __init__(self, parent, vbox): XPathNodeHolder.__init__(self, parent, vbox, 'machines', XPathNodeVM, 'vms') class XPathNodeVM(XPathNode): def __init__(self, parent, obj): XPathNode.__init__(self, parent, obj, 'vm') #def matches(self, subexp): # return subexp=='vm' def enum(self): return [XPathNodeHolderNIC(self, self.obj), XPathNodeValue(self, self.obj.BIOSSettings, 'bios'), ] class XPathNodeHolderNIC(XPathNodeHolder): def __init__(self, parent, mach): XPathNodeHolder.__init__(self, parent, mach, 'nics', XPathNodeVM, 'nics') self.maxNic = mach.platform.properties.getMaxNetworkAdapters(mach.platform.chipsetType) def enum(self): children = [] for i in range(0, self.maxNic): node = XPathNodeNIC(self, self.obj.getNetworkAdapter(i)) children.append(node) return children class XPathNodeNIC(XPathNode): def __init__(self, parent, obj): XPathNode.__init__(self, parent, obj, 'nic') def matches(self, subexp): return subexp == 'nic' class XPathNodeRoot(XPathNode): def __init__(self, ctx): XPathNode.__init__(self, None, None, 'root') self.ctx = ctx def enum(self): return [XPathNodeHolderVM(self, self.ctx['vb'])] def matches(self, subexp): return True def eval_xpath(ctx, scope): pathnames = scope.split("/")[2:] nodes = [XPathNodeRoot(ctx)] for path in pathnames: seen = [] while len(nodes) > 0: node = nodes.pop() seen.append(node) for s in seen: matches = s.lookup(path) for match in matches: nodes.append(match) if len(nodes) == 0: break return nodes def argsToMach(ctx, args): if len(args) < 2: print("usage: %s " % (args[0])) return None uuid = args[1] mach = machById(ctx, uuid) if not mach: print("Machine '%s' is unknown, use list command to find available machines" % (uuid)) return mach def helpSingleCmd(cmd, help_text, from_ext): if from_ext != 0: spec = " [ext from "+from_ext+"]" else: spec = "" print(" %s: %s%s" % (colored(cmd, 'blue'), help_text, spec)) def helpCmd(_ctx, args): if len(args) == 1: print("Help page:") names = list(commands.keys()) names.sort() for i in names: helpSingleCmd(i, commands[i][0], commands[i][2]) else: cmd = args[1] c = commands.get(cmd) if not c: print("Command '%s' not known" % (cmd)) else: helpSingleCmd(cmd, c[0], c[2]) return 0 def asEnumElem(ctx, enum, elem): enumVals = ctx['const'].all_values(enum) for e in list(enumVals.keys()): if str(elem) == str(enumVals[e]): return colored(e, 'green') return colored("", 'green') def enumFromString(ctx, enum, strg): enumVals = ctx['const'].all_values(enum) return enumVals.get(strg, None) def listCmd(ctx, _args): for mach in getMachines(ctx, True): try: if mach.teleporterEnabled: tele = "[T] " else: tele = " " print("%sMachine '%s' [%s], machineState=%s, sessionState=%s" % (tele, colVm(ctx, mach.name), mach.id, asEnumElem(ctx, "MachineState", mach.state), asEnumElem(ctx, "SessionState", mach.sessionState))) except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() return 0 def infoCmd(ctx, args): if len(args) < 2: print("usage: info ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 try: vmos = ctx['vb'].getGuestOSType(mach.OSTypeId) except: vmos = None print(" One can use setvar to change variable, using name in [].") print(" Name [name]: %s" % (colVm(ctx, mach.name))) print(" Description [description]: %s" % (mach.description)) print(" ID [n/a]: %s" % (mach.id)) print(" OS Type [via OSTypeId]: %s" % (vmos.description if vmos is not None else mach.OSTypeId)) print(" Firmware [firmwareType]: %s (%s)" % (asEnumElem(ctx, "FirmwareType", mach.firmwareSettings.firmwareType), mach.firmwareSettings.firmwareType)) print() print(" CPUs [CPUCount]: %d" % (mach.CPUCount)) print(" RAM [memorySize]: %dM" % (mach.memorySize)) print(" VRAM [VRAMSize]: %dM" % (mach.graphicsAdapter.VRAMSize)) print(" Monitors [monitorCount]: %d" % (mach.graphicsAdapter.monitorCount)) print(" Chipset [chipsetType]: %s (%s)" % (asEnumElem(ctx, "ChipsetType", mach.platform.chipsetType), mach.platform.chipsetType)) print() print(" Clipboard mode [clipboardMode]: %s (%s)" % (asEnumElem(ctx, "ClipboardMode", mach.clipboardMode), mach.clipboardMode)) print(" Machine status [n/a]: %s (%s)" % (asEnumElem(ctx, "SessionState", mach.sessionState), mach.sessionState)) print() if mach.teleporterEnabled: print(" Teleport target on port %d (%s)" % (mach.teleporterPort, mach.teleporterPassword)) print() print(" ACPI [BIOSSettings.ACPIEnabled]: %s" % (asState(mach.firmwareSettings.ACPIEnabled))) print(" APIC [BIOSSettings.IOAPICEnabled]: %s" % (asState(mach.firmwareSettings.IOAPICEnabled))) if mach.platform.architecture == ctx['global'].constants.PlatformArchitecture_x86: hwVirtEnabled = mach.platform.x86.getHWVirtExProperty(ctx['global'].constants.HWVirtExPropertyType_Enabled) print(" Hardware virtualization [guest win machine.setHWVirtExProperty(ctx[\\'const\\'].HWVirtExPropertyType_Enabled, value)]: " + asState(hwVirtEnabled)) hwVirtVPID = mach.platform.x86.getHWVirtExProperty(ctx['const'].HWVirtExPropertyType_VPID) print(" VPID support [guest win machine.setHWVirtExProperty(ctx[\\'const\\'].HWVirtExPropertyType_VPID, value)]: " + asState(hwVirtVPID)) hwVirtNestedPaging = mach.platform.x86.getHWVirtExProperty(ctx['const'].HWVirtExPropertyType_NestedPaging) print(" Nested paging [guest win machine.setHWVirtExProperty(ctx[\\'const\\'].HWVirtExPropertyType_NestedPaging, value)]: " + asState(hwVirtNestedPaging)) print(" HPET [HPETEnabled]: %s" % (asState(mach.platform.x86.HPETEnabled))) print(" Hardware 3d acceleration [accelerate3DEnabled]: " + asState(mach.graphicsAdapter.isFeatureEnabled(ctx['const'].GraphicsFeature_Acceleration3D))) print(" Hardware 2d video acceleration [accelerate2DVideoEnabled]: " + asState(mach.graphicsAdapter.isFeatureEnabled(ctx['const'].GraphicsFeature_Acceleration2DVideo))) print(" Use universal time [RTCUseUTC]: %s" % (asState(mach.platform.RTCUseUTC))) audioAdp = mach.audioSettings.adapter if audioAdp.enabled: print(" Audio [via audioAdapter]: chip %s; host driver %s" % (asEnumElem(ctx, "AudioControllerType", audioAdp.audioController), asEnumElem(ctx, "AudioDriverType", audioAdp.audioDriver))) print(" CPU hotplugging [CPUHotPlugEnabled]: %s" % (asState(mach.CPUHotPlugEnabled))) print(" Keyboard [keyboardHIDType]: %s (%s)" % (asEnumElem(ctx, "KeyboardHIDType", mach.keyboardHIDType), mach.keyboardHIDType)) print(" Pointing device [pointingHIDType]: %s (%s)" % (asEnumElem(ctx, "PointingHIDType", mach.pointingHIDType), mach.pointingHIDType)) print(" Last changed [n/a]: " + time.asctime(time.localtime(int(mach.lastStateChange)/1000))) # OSE has no VRDE try: print(" VRDE server [VRDEServer.enabled]: %s" % (asState(mach.VRDEServer.enabled))) except: pass print() print(colCat(ctx, " USB Controllers:")) for oUsbCtrl in ctx['global'].getArray(mach, 'USBControllers'): print(" '%s': type %s standard: %#x" \ % (oUsbCtrl.name, asEnumElem(ctx, "USBControllerType", oUsbCtrl.type), oUsbCtrl.USBStandard)) print() print(colCat(ctx, " I/O subsystem info:")) print(" Cache enabled [IOCacheEnabled]: %s" % (asState(mach.IOCacheEnabled))) print(" Cache size [IOCacheSize]: %dM" % (mach.IOCacheSize)) controllers = ctx['global'].getArray(mach, 'storageControllers') if controllers: print() print(colCat(ctx, " Storage Controllers:")) for controller in controllers: print(" '%s': bus %s type %s" % (controller.name, asEnumElem(ctx, "StorageBus", controller.bus), asEnumElem(ctx, "StorageControllerType", controller.controllerType))) attaches = ctx['global'].getArray(mach, 'mediumAttachments') if attaches: print() print(colCat(ctx, " Media:")) for att in attaches: print(" Controller: '%s' port/device: %d:%d type: %s (%s):" % (att.controller, att.port, att.device, asEnumElem(ctx, "DeviceType", att.type), att.type)) medium = att.medium if att.type == ctx['global'].constants.DeviceType_HardDisk: print(" HDD:") print(" Id: %s" % (medium.id)) print(" Location: %s" % (colPath(ctx, medium.location))) print(" Name: %s" % (medium.name)) print(" Format: %s" % (medium.format)) if att.type == ctx['global'].constants.DeviceType_DVD: print(" DVD:") if medium: print(" Id: %s" % (medium.id)) print(" Name: %s" % (medium.name)) if medium.hostDrive: print(" Host DVD %s" % (colPath(ctx, medium.location))) if att.passthrough: print(" [passthrough mode]") else: print(" Virtual image at %s" % (colPath(ctx, medium.location))) print(" Size: %s" % (medium.size)) if att.type == ctx['global'].constants.DeviceType_Floppy: print(" Floppy:") if medium: print(" Id: %s" % (medium.id)) print(" Name: %s" % (medium.name)) if medium.hostDrive: print(" Host floppy %s" % (colPath(ctx, medium.location))) else: print(" Virtual image at %s" % (colPath(ctx, medium.location))) print(" Size: %s" % (medium.size)) print() print(colCat(ctx, " Shared folders:")) for sharedfolder in ctx['global'].getArray(mach, 'sharedFolders'): printSf(ctx, sharedfolder) return 0 def startCmd(ctx, args): if len(args) < 2: print("usage: start ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 if len(args) > 2: vmtype = args[2] else: vmtype = "gui" startVm(ctx, mach, vmtype) return 0 def createVmCmd(ctx, args): if len(args) != 4: print("usage: createvm ") return 0 name = args[1] arch = args[2] oskind = args[3] try: ctx['vb'].getGuestOSType(oskind) except Exception: print('Unknown OS type:', oskind) return 0 createVm(ctx, name, arch, oskind) return 0 def ginfoCmd(ctx, args): if len(args) < 2: print("usage: ginfo ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'ginfo', '') return 0 def gstctlPrintOk(_ctx, string): return print(colored(string, 'green')) def gstctlPrintErr(_ctx, string): return print(colored(string, 'red')) def execInGuest(ctx, console, args, env, user, passwd, tmo, inputPipe=None, _outputPipe=None): if len(args) < 1: print("exec in guest needs at least program name") return 1 guest = console.guest # shall contain program name as argv[0] gargs = args if g_fVerbose: gstctlPrintOk(ctx, "starting guest session for user '%s' (password '%s')" % (user, passwd)) else: gstctlPrintOk(ctx, ("starting guest session for user '%s' ..." % (user))) try: guestSession = guest.createSession(user, passwd, "", "vboxshell guest exec") guestSession.waitForArray([ ctx['global'].constants.GuestSessionWaitForFlag_Start ], 30 * 1000) except Exception as e: gstctlPrintErr(ctx, "starting guest session failed:") printErr(ctx, e) return 1 if g_fVerbose: gstctlPrintOk(ctx, "guest session %d started" % guestSession.id) aProcCreateFlags = [ ctx['global'].constants.ProcessCreateFlag_WaitForStdOut, \ ctx['global'].constants.ProcessCreateFlag_WaitForStdErr ] if inputPipe is not None: aProcCreateFlags.extend([ ctx['global'].constants.ProcessCreateFlag_WaitForStdIn ]) if g_fVerbose: gstctlPrintOk(ctx, "starting process '%s' with args '%s' as user '%s' (password '%s')" % (args[0], gargs, user, passwd)) process = guestSession.processCreate(args[0], gargs, '', env, aProcCreateFlags, tmo) try: waitResult = process.waitForArray([ ctx['global'].constants.ProcessWaitForFlag_Start ], 30 * 1000) except Exception as e: gstctlPrintErr(ctx, "waiting for guest process start failed:") printErr(ctx, e) return 1 if waitResult != ctx['global'].constants.ProcessWaitResult_Start: gstctlPrintErr(ctx, "process start failed: got wait result %d, expected %d" \ % (waitResult, ctx['global'].constants.ProcessWaitResult_Start) ) return 1 procStatus = process.status if procStatus != ctx['global'].constants.ProcessStatus_Started: gstctlPrintErr(ctx, "process start failed: got process status %d, expected %d" \ % (procStatus, ctx['global'].constants.ProcessStatus_Started) ) return 1 if g_fVerbose: gstctlPrintOk(ctx, "process %d started" % (process.PID)) if process.PID != 0: try: fCompleted = False fReadStdOut = False fReadStdErr = False while not fCompleted: waitResult = process.waitForArray([ ctx['global'].constants.ProcessWaitForFlag_Terminate, \ ctx['global'].constants.ProcessWaitForFlag_StdOut, \ ctx['global'].constants.ProcessWaitForFlag_StdErr ], 1000) if waitResult == ctx['global'].constants.ProcessWaitResult_WaitFlagNotSupported: fReadStdOut = True fReadStdErr = True elif waitResult == ctx['global'].constants.ProcessWaitResult_Terminate: fCompleted = True break elif waitResult == ctx['global'].constants.ProcessWaitResult_Timeout: gstctlPrintErr(ctx, "timeout while waiting for process") break else: gstctlPrintErr(ctx, "got unhandled wait result %d" % (waitResult)) if inputPipe: indata = inputPipe(ctx) if indata is not None: write = len(indata) off = 0 while write > 0: written = process.write(0, 10*1000, indata[off:]) off = off + written write = write - written else: # EOF try: process.write(0, 10*1000, " ") except: pass if fReadStdOut: data = process.read(1, 64 * 1024, 10*1000) if data and len(data): sys.stdout.write(bytes(data).decode('utf-8')) fReadStdOut = False if fReadStdErr: data = process.read(2, 64 * 1024, 10*1000) if data and len(data): sys.stderr.write(bytes(data).decode('utf-8')) fReadStdErr = False ctx['global'].waitForEvents(0) if fCompleted: exitCode = process.exitCode if exitCode == 0: gstctlPrintOk(ctx, "process exit code: %d" % (exitCode)) else: gstctlPrintErr(ctx, "process exit code: %d" % (exitCode)) except KeyboardInterrupt: print("Interrupted.") ctx['interrupt'] = True except Exception as e: printErr(ctx, e) if guestSession: try: if g_fVerbose: gstctlPrintOk(ctx, "closing guest session ...") guestSession.close() except: printErr(ctx, e) return 0 def copyToGuest(ctx, console, args, user, passwd): src = args[0] dst = args[1] flags = 0 print("Copying host %s to guest %s" % (src, dst)) progress = console.guest.copyToGuest(src, dst, user, passwd, flags) progressBar(ctx, progress) def nh_raw_input(prompt=""): if prompt: sys.stdout.write(prompt) sys.stdout.flush() line = sys.stdin.readline() if not line: raise EOFError if line[-1] == '\n': line = line[:-1] return line def getCred(_ctx): import getpass user = getpass.getuser() if user: user_inp = nh_raw_input("User (%s): " % (user)) else: user_inp = nh_raw_input("User: ") if len(user_inp) > 0: user = user_inp passwd = getpass.getpass() return (user, passwd) def gexecCmd(ctx, args): if len(args) < 2: print("usage: gexec command args") return 0 mach = argsToMach(ctx, args) if not mach: return 0 gargs = args[2:] env = [] # ["DISPLAY=:0"] (user, passwd) = getCred(ctx) gargs.insert(0, lambda ctx, mach, console, args: execInGuest(ctx, console, args, env, user, passwd, 10000)) cmdExistingVm(ctx, mach, 'guestlambda', gargs) return 0 def gcopyCmd(ctx, args): if len(args) < 2: print("usage: gcopy host_path guest_path") return 0 mach = argsToMach(ctx, args) if not mach: return 0 gargs = args[2:] (user, passwd) = getCred(ctx) gargs.insert(0, lambda ctx, mach, console, args: copyToGuest(ctx, console, args, user, passwd)) cmdExistingVm(ctx, mach, 'guestlambda', gargs) return 0 def readCmdPipe(ctx, _hcmd): try: return ctx['process'].communicate()[0] except: return None def gpipeCmd(ctx, args): if len(args) < 4: print("usage: gpipe hostProgram guestProgram, such as gpipe linux '/bin/uname -a' '/bin/sh -c \"/usr/bin/tee; /bin/uname -a\"'") return 0 mach = argsToMach(ctx, args) if not mach: return 0 hcmd = args[2] gcmd = args[3] (user, passwd) = getCred(ctx) import subprocess with subprocess.Popen(split_no_quotes(hcmd), stdout=subprocess.PIPE) as ctx['process']: gargs = split_no_quotes(gcmd) env = [] gargs.insert(0, lambda ctx, mach, console, args: execInGuest(ctx, console, args, env, user, passwd, 10000, lambda ctx:readCmdPipe(ctx, hcmd))) cmdExistingVm(ctx, mach, 'guestlambda', gargs) try: ctx['process'].terminate() except: pass ctx['process'] = None return 0 def removeVmCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 removeVm(ctx, mach) return 0 def pauseCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'pause', '') return 0 def powerdownCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'powerdown', '') return 0 def powerbuttonCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'powerbutton', '') return 0 def resumeCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'resume', '') return 0 def saveCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'save', '') return 0 def statsCmd(ctx, args): mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'stats', '') return 0 def guestCmd(ctx, args): if len(args) < 3: print("usage: guest commands") return 0 mach = argsToMach(ctx, args) if not mach: return 0 if mach.state != ctx['const'].MachineState_Running: cmdClosedVm(ctx, mach, lambda ctx, mach, a: guestExec (ctx, mach, None, ' '.join(args[2:]))) else: cmdExistingVm(ctx, mach, 'guest', ' '.join(args[2:])) return 0 def screenshotCmd(ctx, args): if len(args) < 2: print("usage: screenshot ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'screenshot', args[2:]) return 0 def teleportCmd(ctx, args): if len(args) < 3: print("usage: teleport host:port ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'teleport', args[2:]) return 0 def portalsettings(_ctx, mach, args): enabled = args[0] mach.teleporterEnabled = enabled if enabled: port = args[1] passwd = args[2] mach.teleporterPort = port mach.teleporterPassword = passwd def openportalCmd(ctx, args): if len(args) < 3: print("usage: openportal port ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 port = int(args[2]) if len(args) > 3: passwd = args[3] else: passwd = "" if not mach.teleporterEnabled or mach.teleporterPort != port or passwd: cmdClosedVm(ctx, mach, portalsettings, [True, port, passwd]) startVm(ctx, mach, "gui") return 0 def closeportalCmd(ctx, args): if len(args) < 2: print("usage: closeportal ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 if mach.teleporterEnabled: cmdClosedVm(ctx, mach, portalsettings, [False]) return 0 def gueststatsCmd(ctx, args): if len(args) < 2: print("usage: gueststats ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 cmdExistingVm(ctx, mach, 'gueststats', args[2:]) return 0 def plugcpu(_ctx, mach, args): plug = args[0] cpu = args[1] if plug: print("Adding CPU %d..." % (cpu)) mach.hotPlugCPU(cpu) else: print("Removing CPU %d..." % (cpu)) mach.hotUnplugCPU(cpu) def plugcpuCmd(ctx, args): if len(args) < 2: print("usage: plugcpu ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 if str(mach.sessionState) != str(ctx['const'].SessionState_Locked): if mach.CPUHotPlugEnabled: cmdClosedVm(ctx, mach, plugcpu, [True, int(args[2])]) else: cmdExistingVm(ctx, mach, 'plugcpu', args[2]) return 0 def unplugcpuCmd(ctx, args): if len(args) < 2: print("usage: unplugcpu ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 if str(mach.sessionState) != str(ctx['const'].SessionState_Locked): if mach.CPUHotPlugEnabled: cmdClosedVm(ctx, mach, plugcpu, [False, int(args[2])]) else: cmdExistingVm(ctx, mach, 'unplugcpu', args[2]) return 0 def setvar(_ctx, _mach, args): expr = 'mach.'+args[0]+' = '+args[1] print("Executing", expr) exec(expr) # pylint: disable=exec-used def setvarCmd(ctx, args): if len(args) < 4: print("usage: setvar ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 cmdClosedVm(ctx, mach, setvar, args[2:]) return 0 def setvmextra(_ctx, mach, args): key = args[0] value = args[1] print("%s: setting %s to %s" % (mach.name, key, value if value else None)) mach.setExtraData(key, value) def setExtraDataCmd(ctx, args): if len(args) < 3: print("usage: setextra [vmname|uuid|global] key ") return 0 key = args[2] if len(args) == 4: value = args[3] else: value = '' if args[1] == 'global': ctx['vb'].setExtraData(key, value) return 0 mach = argsToMach(ctx, args) if not mach: return 0 cmdClosedVm(ctx, mach, setvmextra, [key, value]) return 0 def printExtraKey(obj, key, value): print("%s: '%s' = '%s'" % (obj, key, value)) def getExtraDataCmd(ctx, args): if len(args) < 2: print("usage: getextra [vmname|uuid|global] ") return 0 if len(args) == 3: key = args[2] else: key = None if args[1] == 'global': obj = ctx['vb'] else: obj = argsToMach(ctx, args) if not obj: return 0 if not key: keys = obj.getExtraDataKeys() else: keys = [ key ] for k in keys: printExtraKey(args[1], k, obj.getExtraData(k)) return 0 def quitCmd(_ctx, _args): return 1 def aliasCmd(_ctx, args): if len(args) == 3: aliases[args[1]] = args[2] return 0 for (key, value) in list(aliases.items()): print("'%s' is an alias for '%s'" % (key, value)) return 0 def verboseCmd(_ctx, args): global g_fVerbose if len(args) > 1: g_fVerbose = args[1]=='on' else: g_fVerbose = not g_fVerbose return 0 def colorsCmd(_ctx, args): global g_fHasColors if len(args) > 1: g_fHasColors = args[1] == 'on' else: g_fHasColors = not g_fHasColors return 0 def hostCmd(ctx, _args): vbox = ctx['vb'] try: print("VirtualBox version %s" % (colored(vbox.version, 'blue'))) except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() props = vbox.systemProperties print("Machines: %s" % (colPath(ctx, props.defaultMachineFolder))) #print("Global shared folders:") #for ud in ctx['global'].getArray(vbox, 'sharedFolders'): # printSf(ctx, sf) host = vbox.host cnt = host.processorCount print(colCat(ctx, "Processors:")) print(" available/online: %d/%d " % (cnt, host.processorOnlineCount)) for i in range(0, cnt): print(" processor #%d speed: %dMHz %s" % (i, host.getProcessorSpeed(i), host.getProcessorDescription(i))) print(colCat(ctx, "RAM:")) print(" %dM (free %dM)" % (host.memorySize, host.memoryAvailable)) print(colCat(ctx, "OS:")) print(" %s (%s)" % (host.operatingSystem, host.OSVersion)) print(colCat(ctx, "Network interfaces:")) for iface in ctx['global'].getArray(host, 'networkInterfaces'): print(" %s (%s)" % (iface.name, iface.IPAddress)) print(colCat(ctx, "DVD drives:")) for drive in ctx['global'].getArray(host, 'DVDDrives'): print(" %s - %s" % (drive.name, drive.description)) print(colCat(ctx, "Floppy drives:")) for drive in ctx['global'].getArray(host, 'floppyDrives'): print(" %s - %s" % (drive.name, drive.description)) print(colCat(ctx, "USB devices:")) for usbdev in ctx['global'].getArray(host, 'USBDevices'): printHostUsbDev(ctx, usbdev) if ctx['perf']: for metric in ctx['perf'].query(["*"], [host]): print(metric['name'], metric['values_as_string']) return 0 def monitorGuestCmd(ctx, args): if len(args) < 2: print("usage: monitorGuest (duration)") return 0 mach = argsToMach(ctx, args) if not mach: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.eventSource, active, dur)]) return 0 def monitorGuestKbdCmd(ctx, args): if len(args) < 2: print("usage: monitorGuestKbd name (duration)") return 0 mach = argsToMach(ctx, args) if not mach: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.keyboard.eventSource, active, dur)]) return 0 def monitorGuestMouseCmd(ctx, args): if len(args) < 2: print("usage: monitorGuestMouse name (duration)") return 0 mach = argsToMach(ctx, args) if not mach: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.mouse.eventSource, active, dur)]) return 0 def monitorGuestMultiTouchCmd(ctx, args): if len(args) < 2: print("usage: monitorGuestMultiTouch name (duration)") return 0 mach = argsToMach(ctx, args) if not mach: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.mouse.eventSource, active, dur)]) return 0 def monitorVBoxCmd(ctx, args): if len(args) > 2: print("usage: monitorVBox (duration)") return 0 dur = 5 if len(args) > 1: dur = float(args[1]) vbox = ctx['vb'] active = False monitorSource(ctx, vbox.eventSource, active, dur) return 0 def getAdapterType(ctx, natype): if (natype in ( ctx['global'].constants.NetworkAdapterType_Am79C970A , ctx['global'].constants.NetworkAdapterType_Am79C973 , ctx['global'].constants.NetworkAdapterType_Am79C960)): return "pcnet" if (natype in ( ctx['global'].constants.NetworkAdapterType_I82540EM , ctx['global'].constants.NetworkAdapterType_I82545EM , ctx['global'].constants.NetworkAdapterType_I82543GC)): return "e1000" if natype == ctx['global'].constants.NetworkAdapterType_Virtio: return "virtio" if natype == ctx['global'].constants.NetworkAdapterType_Null: return None raise Exception("Unknown adapter type: "+natype) def portForwardCmd(ctx, args): if len(args) != 5: print("usage: portForward ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 adapterNum = int(args[2]) hostPort = int(args[3]) guestPort = int(args[4]) proto = "TCP" session = ctx['global'].openMachineSession(mach, fPermitSharing=True) mach = session.machine adapter = mach.getNetworkAdapter(adapterNum) adapterType = getAdapterType(ctx, adapter.adapterType) profile_name = proto+"_"+str(hostPort)+"_"+str(guestPort) config = "VBoxInternal/Devices/" + adapterType + "/" config = config + str(adapter.slot) +"/LUN#0/Config/" + profile_name mach.setExtraData(config + "/Protocol", proto) mach.setExtraData(config + "/HostPort", str(hostPort)) mach.setExtraData(config + "/GuestPort", str(guestPort)) mach.saveSettings() session.unlockMachine() return 0 def showLogCmd(ctx, args): if len(args) < 2: print("usage: showLog ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 log = 0 if len(args) > 2: log = args[2] uOffset = 0 while True: data = mach.readLog(log, uOffset, 4096) if len(data) == 0: break # print adds either NL or space to chunks not ending with a NL sys.stdout.write(str(data)) uOffset += len(data) return 0 def findLogCmd(ctx, args): if len(args) < 3: print("usage: findLog ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 log = 0 if len(args) > 3: log = args[3] pattern = args[2] uOffset = 0 while True: # to reduce line splits on buffer boundary data = mach.readLog(log, uOffset, 512*1024) if len(data) == 0: break buf = str(data).split("\n") for line in buf: match = re.findall(pattern, line) if len(match) > 0: for cur_match in match: line = line.replace(cur_match, colored(cur_match, 'red')) print(line) uOffset += len(data) return 0 def findAssertCmd(ctx, args): if len(args) < 2: print("usage: findAssert ") return 0 mach = argsToMach(ctx, args) if not mach: return 0 log = 0 if len(args) > 2: log = args[2] uOffset = 0 ere = re.compile(r'(Expression:|\!\!\!\!\!\!)') active = False context = 0 while True: # to reduce line splits on buffer boundary data = mach.readLog(log, uOffset, 512*1024) if len(data) == 0: break buf = str(data).split("\n") for line in buf: if active: print(line) if context == 0: active = False else: context = context - 1 continue match = ere.findall(line) if len(match) > 0: active = True context = 50 print(line) uOffset += len(data) return 0 def evalCmd(ctx, args): expr = ' '.join(args[1:]) try: exec(expr) # pylint: disable=exec-used except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() return 0 def reloadExtCmd(ctx, _args): # maybe will want more args smartness checkUserExtensions(ctx, commands, getHomeFolder(ctx)) autoCompletion(commands, ctx) return 0 def runScriptCmd(ctx, args): if len(args) != 2: print("usage: runScript