VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 98655

Last change on this file since 98655 was 98655, checked in by vboxsync, 22 months ago

ValKit: Pylint 2.16.2 adjustments.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 90.8 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 98655 2023-02-20 15:05:40Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2023 Oracle and/or its affiliates.
11
12This file is part of VirtualBox base platform packages, as
13available from https://www.virtualbox.org.
14
15This program is free software; you can redistribute it and/or
16modify it under the terms of the GNU General Public License
17as published by the Free Software Foundation, in version 3 of the
18License.
19
20This program is distributed in the hope that it will be useful, but
21WITHOUT ANY WARRANTY; without even the implied warranty of
22MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23General Public License for more details.
24
25You should have received a copy of the GNU General Public License
26along with this program; if not, see <https://www.gnu.org/licenses>.
27
28The contents of this file may alternatively be used under the terms
29of the Common Development and Distribution License Version 1.0
30(CDDL), a copy of it is provided in the "COPYING.CDDL" file included
31in the VirtualBox distribution, in which case the provisions of the
32CDDL are applicable instead of those of the GPL.
33
34You may elect to license modified versions of this file under the
35terms and conditions of either the GPL or the CDDL or both.
36
37SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0
38"""
39__version__ = "$Revision: 98655 $"
40
41# Standard Python imports.
42import array;
43import errno;
44import os;
45import select;
46import socket;
47import sys;
48import threading;
49import time;
50import zlib;
51import uuid;
52
53# Validation Kit imports.
54from common import utils;
55from testdriver import base;
56from testdriver import reporter;
57from testdriver.base import TdTaskBase;
58
59# Python 3 hacks:
60if sys.version_info[0] >= 3:
61 long = int; # pylint: disable=redefined-builtin,invalid-name
62
63#
64# Helpers for decoding data received from the TXS.
65# These are used both the Session and Transport classes.
66#
67
68def getU32(abData, off):
69 """Get a U32 field."""
70 return abData[off] \
71 + abData[off + 1] * 256 \
72 + abData[off + 2] * 65536 \
73 + abData[off + 3] * 16777216;
74
75def getSZ(abData, off, sDefault = None):
76 """
77 Get a zero-terminated string field.
78 Returns sDefault if the string is invalid.
79 """
80 cchStr = getSZLen(abData, off);
81 if cchStr >= 0:
82 abStr = abData[off:(off + cchStr)];
83 try:
84 if sys.version_info < (3, 9, 0):
85 # Removed since Python 3.9.
86 sStr = abStr.tostring(); # pylint: disable=no-member
87 else:
88 sStr = abStr.tobytes();
89 return sStr.decode('utf_8');
90 except:
91 reporter.errorXcpt('getSZ(,%u)' % (off));
92 return sDefault;
93
94def getSZLen(abData, off):
95 """
96 Get the length of a zero-terminated string field, in bytes.
97 Returns -1 if off is beyond the data packet or not properly terminated.
98 """
99 cbData = len(abData);
100 if off >= cbData:
101 return -1;
102
103 offCur = off;
104 while abData[offCur] != 0:
105 offCur = offCur + 1;
106 if offCur >= cbData:
107 return -1;
108
109 return offCur - off;
110
111def isValidOpcodeEncoding(sOpcode):
112 """
113 Checks if the specified opcode is valid or not.
114 Returns True on success.
115 Returns False if it is invalid, details in the log.
116 """
117 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
118 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
119 if len(sOpcode) != 8:
120 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
121 return False;
122 for i in range(0, 1):
123 if sSet1.find(sOpcode[i]) < 0:
124 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
125 return False;
126 for i in range(2, 7):
127 if sSet2.find(sOpcode[i]) < 0:
128 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
129 return False;
130 return True;
131
132#
133# Helper for encoding data sent to the TXS.
134#
135
136def u32ToByteArray(u32):
137 """Encodes the u32 value as a little endian byte (B) array."""
138 return array.array('B',
139 ( u32 % 256,
140 (u32 // 256) % 256,
141 (u32 // 65536) % 256,
142 (u32 // 16777216) % 256) );
143
144def escapeString(sString):
145 """
146 Does $ escaping of the string so TXS doesn't try do variable expansion.
147 """
148 return sString.replace('$', '$$');
149
150
151
152class TransportBase(object):
153 """
154 Base class for the transport layer.
155 """
156
157 def __init__(self, sCaller):
158 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
159 self.fDummy = 0;
160 self.abReadAheadHdr = array.array('B');
161
162 def toString(self):
163 """
164 Stringify the instance for logging and debugging.
165 """
166 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
167
168 def __str__(self):
169 return self.toString();
170
171 def cancelConnect(self):
172 """
173 Cancels any pending connect() call.
174 Returns None;
175 """
176 return None;
177
178 def connect(self, cMsTimeout):
179 """
180 Quietly attempts to connect to the TXS.
181
182 Returns True on success.
183 Returns False on retryable errors (no logging).
184 Returns None on fatal errors with details in the log.
185
186 Override this method, don't call super.
187 """
188 _ = cMsTimeout;
189 return False;
190
191 def disconnect(self, fQuiet = False):
192 """
193 Disconnect from the TXS.
194
195 Returns True.
196
197 Override this method, don't call super.
198 """
199 _ = fQuiet;
200 return True;
201
202 def sendBytes(self, abBuf, cMsTimeout):
203 """
204 Sends the bytes in the buffer abBuf to the TXS.
205
206 Returns True on success.
207 Returns False on failure and error details in the log.
208
209 Override this method, don't call super.
210
211 Remarks: len(abBuf) is always a multiple of 16.
212 """
213 _ = abBuf; _ = cMsTimeout;
214 return False;
215
216 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
217 """
218 Receive cb number of bytes from the TXS.
219
220 Returns the bytes (array('B')) on success.
221 Returns None on failure and error details in the log.
222
223 Override this method, don't call super.
224
225 Remarks: cb is always a multiple of 16.
226 """
227 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
228 return None;
229
230 def isConnectionOk(self):
231 """
232 Checks if the connection is OK.
233
234 Returns True if it is.
235 Returns False if it isn't (caller should call diconnect).
236
237 Override this method, don't call super.
238 """
239 return True;
240
241 def isRecvPending(self, cMsTimeout = 0):
242 """
243 Checks if there is incoming bytes, optionally waiting cMsTimeout
244 milliseconds for something to arrive.
245
246 Returns True if there is, False if there isn't.
247
248 Override this method, don't call super.
249 """
250 _ = cMsTimeout;
251 return False;
252
253 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
254 """
255 Sends a message (opcode + encoded payload).
256
257 Returns True on success.
258 Returns False on failure and error details in the log.
259 """
260 # Fix + check the opcode.
261 if len(sOpcode) < 2:
262 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
263 return False;
264 sOpcode = sOpcode.ljust(8);
265 if not isValidOpcodeEncoding(sOpcode):
266 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
267 return False;
268
269 # Start construct the message.
270 cbMsg = 16 + len(abPayload);
271 abMsg = array.array('B');
272 abMsg.extend(u32ToByteArray(cbMsg));
273 abMsg.extend((0, 0, 0, 0)); # uCrc32
274 try:
275 abMsg.extend(array.array('B', \
276 ( ord(sOpcode[0]), \
277 ord(sOpcode[1]), \
278 ord(sOpcode[2]), \
279 ord(sOpcode[3]), \
280 ord(sOpcode[4]), \
281 ord(sOpcode[5]), \
282 ord(sOpcode[6]), \
283 ord(sOpcode[7]) ) ) );
284 if abPayload:
285 abMsg.extend(abPayload);
286 except:
287 reporter.fatalXcpt('sendMsgInt: packing problem...');
288 return False;
289
290 # checksum it, padd it and send it off.
291 uCrc32 = zlib.crc32(abMsg[8:]);
292 abMsg[4:8] = u32ToByteArray(uCrc32);
293
294 while len(abMsg) % 16:
295 abMsg.append(0);
296
297 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
298 return self.sendBytes(abMsg, cMsTimeout);
299
300 def recvMsg(self, cMsTimeout, fNoDataOk = False):
301 """
302 Receives a message from the TXS.
303
304 Returns the message three-tuple: length, opcode, payload.
305 Returns (None, None, None) on failure and error details in the log.
306 """
307
308 # Read the header.
309 if self.abReadAheadHdr:
310 assert(len(self.abReadAheadHdr) == 16);
311 abHdr = self.abReadAheadHdr;
312 self.abReadAheadHdr = array.array('B');
313 else:
314 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
315 if abHdr is None:
316 return (None, None, None);
317 if len(abHdr) != 16:
318 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
319 return (None, None, None);
320
321 # Unpack and validate the header.
322 cbMsg = getU32(abHdr, 0);
323 uCrc32 = getU32(abHdr, 4);
324
325 if sys.version_info < (3, 9, 0):
326 # Removed since Python 3.9.
327 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
328 else:
329 sOpcode = abHdr[8:16].tobytes();
330 sOpcode = sOpcode.decode('ascii');
331
332 if cbMsg < 16:
333 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
334 return (None, None, None);
335 if cbMsg > 1024*1024:
336 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
337 return (None, None, None);
338 if not isValidOpcodeEncoding(sOpcode):
339 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
340 return (None, None, None);
341
342 # Get the payload (if any), dropping the padding.
343 abPayload = array.array('B');
344 if cbMsg > 16:
345 if cbMsg % 16:
346 cbPadding = 16 - (cbMsg % 16);
347 else:
348 cbPadding = 0;
349 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
350 if abPayload is None:
351 self.abReadAheadHdr = abHdr;
352 if not fNoDataOk :
353 reporter.log('recvMsg: failed to recv payload bytes!');
354 return (None, None, None);
355
356 while cbPadding > 0:
357 abPayload.pop();
358 cbPadding = cbPadding - 1;
359
360 # Check the CRC-32.
361 if uCrc32 != 0:
362 uActualCrc32 = zlib.crc32(abHdr[8:]);
363 if cbMsg > 16:
364 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
365 uActualCrc32 = uActualCrc32 & 0xffffffff;
366 if uCrc32 != uActualCrc32:
367 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
368 return (None, None, None);
369
370 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
371 return (cbMsg, sOpcode, abPayload);
372
373 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
374 """
375 Sends a message (opcode + payload tuple).
376
377 Returns True on success.
378 Returns False on failure and error details in the log.
379 Returns None if you pass the incorrectly typed parameters.
380 """
381 # Encode the payload.
382 abPayload = array.array('B');
383 for o in aoPayload:
384 try:
385 if utils.isString(o):
386 if sys.version_info[0] >= 3:
387 abPayload.extend(o.encode('utf_8'));
388 else:
389 # the primitive approach...
390 sUtf8 = o.encode('utf_8');
391 for ch in sUtf8:
392 abPayload.append(ord(ch))
393 abPayload.append(0);
394 elif isinstance(o, (long, int)):
395 if o < 0 or o > 0xffffffff:
396 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
397 return None;
398 abPayload.extend(u32ToByteArray(o));
399 elif isinstance(o, array.array):
400 abPayload.extend(o);
401 else:
402 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
403 return None;
404 except:
405 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
406 return None;
407 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
408
409
410class Session(TdTaskBase):
411 """
412 A Test eXecution Service (TXS) client session.
413 """
414
415 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
416 """
417 Construct a TXS session.
418
419 This starts by connecting to the TXS and will enter the signalled state
420 when connected or the timeout has been reached.
421 """
422 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
423 self.oTransport = oTransport;
424 self.sStatus = "";
425 self.cMsTimeout = 0;
426 self.fErr = True; # Whether to report errors as error.
427 self.msStart = 0;
428 self.oThread = None;
429 self.fnTask = self.taskDummy;
430 self.aTaskArgs = None;
431 self.oTaskRc = None;
432 self.t3oReply = (None, None, None);
433 self.fScrewedUpMsgState = False;
434 self.fTryConnect = fTryConnect;
435
436 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
437 raise base.GenError("startTask failed");
438
439 def __del__(self):
440 """Make sure to cancel the task when deleted."""
441 self.cancelTask();
442
443 def toString(self):
444 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
445 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
446 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
447 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
448
449 def taskDummy(self):
450 """Place holder to catch broken state handling."""
451 raise Exception();
452
453 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
454 """
455 Kicks of a new task.
456
457 cMsTimeout: The task timeout in milliseconds. Values less than
458 500 ms will be adjusted to 500 ms. This means it is
459 OK to use negative value.
460 sStatus: The task status.
461 fnTask: The method that'll execute the task.
462 aArgs: Arguments to pass to fnTask.
463
464 Returns True on success, False + error in log on failure.
465 """
466 if not self.cancelTask():
467 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
468 return False;
469
470 # Change status and make sure we're the
471 self.lockTask();
472 if self.sStatus != "":
473 self.unlockTask();
474 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
475 return False;
476 self.sStatus = "setup";
477 self.oTaskRc = None;
478 self.t3oReply = (None, None, None);
479 self.resetTaskLocked();
480 self.unlockTask();
481
482 self.cMsTimeout = max(cMsTimeout, 500);
483 self.fErr = not fIgnoreErrors;
484 self.fnTask = fnTask;
485 self.aTaskArgs = aArgs;
486 self.oThread = threading.Thread(target=self.taskThread, args=(), name='TXS-%s' % (sStatus));
487 self.oThread.setDaemon(True); # pylint: disable=deprecated-method
488 self.msStart = base.timestampMilli();
489
490 self.lockTask();
491 self.sStatus = sStatus;
492 self.unlockTask();
493 self.oThread.start();
494
495 return True;
496
497 def cancelTask(self, fSync = True):
498 """
499 Attempts to cancel any pending tasks.
500 Returns success indicator (True/False).
501 """
502 self.lockTask();
503
504 if self.sStatus == "":
505 self.unlockTask();
506 return True;
507 if self.sStatus == "setup":
508 self.unlockTask();
509 return False;
510 if self.sStatus == "cancelled":
511 self.unlockTask();
512 return False;
513
514 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
515 if self.sStatus == 'connecting':
516 self.oTransport.cancelConnect();
517
518 self.sStatus = "cancelled";
519 oThread = self.oThread;
520 self.unlockTask();
521
522 if not fSync:
523 return False;
524
525 oThread.join(61.0);
526
527 if sys.version_info < (3, 9, 0):
528 # Removed since Python 3.9.
529 return oThread.isAlive(); # pylint: disable=no-member,deprecated-method
530 return oThread.is_alive();
531
532 def taskThread(self):
533 """
534 The task thread function.
535 This does some housekeeping activities around the real task method call.
536 """
537 if not self.isCancelled():
538 try:
539 fnTask = self.fnTask;
540 oTaskRc = fnTask(*self.aTaskArgs);
541 except:
542 reporter.fatalXcpt('taskThread', 15);
543 oTaskRc = None;
544 else:
545 reporter.log('taskThread: cancelled already');
546
547 self.lockTask();
548
549 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
550 self.oTaskRc = oTaskRc;
551 self.oThread = None;
552 self.sStatus = '';
553 self.signalTaskLocked();
554
555 self.unlockTask();
556 return None;
557
558 def isCancelled(self):
559 """Internal method for checking if the task has been cancelled."""
560 self.lockTask();
561 sStatus = self.sStatus;
562 self.unlockTask();
563 if sStatus == "cancelled":
564 return True;
565 return False;
566
567 def hasTimedOut(self):
568 """Internal method for checking if the task has timed out or not."""
569 cMsLeft = self.getMsLeft();
570 if cMsLeft <= 0:
571 return True;
572 return False;
573
574 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
575 """Gets the time left until the timeout."""
576 cMsElapsed = base.timestampMilli() - self.msStart;
577 if cMsElapsed < 0:
578 return cMsMin;
579 cMsLeft = self.cMsTimeout - cMsElapsed;
580 if cMsLeft <= cMsMin:
581 return cMsMin;
582 if cMsLeft > cMsMax > 0:
583 return cMsMax
584 return cMsLeft;
585
586 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
587 """
588 Wrapper for TransportBase.recvMsg that stashes the response away
589 so the client can inspect it later on.
590 """
591 if cMsTimeout is None:
592 cMsTimeout = self.getMsLeft(500);
593 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
594 self.lockTask();
595 self.t3oReply = (cbMsg, sOpcode, abPayload);
596 self.unlockTask();
597 return (cbMsg, sOpcode, abPayload);
598
599 def recvAck(self, fNoDataOk = False):
600 """
601 Receives an ACK or error response from the TXS.
602
603 Returns True on success.
604 Returns False on timeout or transport error.
605 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
606 and there are always details of some sort or another.
607 """
608 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
609 if cbMsg is None:
610 return False;
611 sOpcode = sOpcode.strip()
612 if sOpcode == "ACK":
613 return True;
614 return (sOpcode, getSZ(abPayload, 0, sOpcode));
615
616 def recvAckLogged(self, sCommand, fNoDataOk = False):
617 """
618 Wrapper for recvAck and logging.
619 Returns True on success (ACK).
620 Returns False on time, transport error and errors signalled by TXS.
621 """
622 rc = self.recvAck(fNoDataOk);
623 if rc is not True and not fNoDataOk:
624 if rc is False:
625 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
626 else:
627 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
628 rc = False;
629 return rc;
630
631 def recvTrueFalse(self, sCommand):
632 """
633 Receives a TRUE/FALSE response from the TXS.
634 Returns True on TRUE, False on FALSE and None on error/other (logged).
635 """
636 cbMsg, sOpcode, abPayload = self.recvReply();
637 if cbMsg is None:
638 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
639 return None;
640
641 sOpcode = sOpcode.strip()
642 if sOpcode == "TRUE":
643 return True;
644 if sOpcode == "FALSE":
645 return False;
646 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
647 return None;
648
649 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
650 """
651 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
652 """
653 if cMsTimeout is None:
654 cMsTimeout = self.getMsLeft(500);
655 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
656
657 def asyncToSync(self, fnAsync, *aArgs):
658 """
659 Wraps an asynchronous task into a synchronous operation.
660
661 Returns False on failure, task return status on success.
662 """
663 rc = fnAsync(*aArgs);
664 if rc is False:
665 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
666 return rc;
667
668 rc = self.waitForTask(self.cMsTimeout + 5000);
669 if rc is False:
670 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
671 self.cancelTask();
672 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
673 return False;
674
675 rc = self.getResult();
676 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
677 return rc;
678
679 #
680 # Connection tasks.
681 #
682
683 def taskConnect(self, cMsIdleFudge):
684 """Tries to connect to the TXS"""
685 while not self.isCancelled():
686 reporter.log2('taskConnect: connecting ...');
687 rc = self.oTransport.connect(self.getMsLeft(500));
688 if rc is True:
689 reporter.log('taskConnect: succeeded');
690 return self.taskGreet(cMsIdleFudge);
691 if rc is None:
692 reporter.log2('taskConnect: unable to connect');
693 return None;
694 if self.hasTimedOut():
695 reporter.log2('taskConnect: timed out');
696 if not self.fTryConnect:
697 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
698 return False;
699 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
700 if not self.fTryConnect:
701 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
702 return False;
703
704 def taskGreet(self, cMsIdleFudge):
705 """Greets the TXS"""
706 rc = self.sendMsg("HOWDY", ());
707 if rc is True:
708 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
709 if rc is True:
710 while cMsIdleFudge > 0:
711 cMsIdleFudge -= 1000;
712 time.sleep(1);
713 else:
714 self.oTransport.disconnect(self.fTryConnect);
715 return rc;
716
717 def taskBye(self):
718 """Says goodbye to the TXS"""
719 rc = self.sendMsg("BYE");
720 if rc is True:
721 rc = self.recvAckLogged("BYE");
722 self.oTransport.disconnect();
723 return rc;
724
725 def taskVer(self):
726 """Requests version information from TXS"""
727 rc = self.sendMsg("VER");
728 if rc is True:
729 rc = False;
730 cbMsg, sOpcode, abPayload = self.recvReply();
731 if cbMsg is not None:
732 sOpcode = sOpcode.strip();
733 if sOpcode == "ACK VER":
734 sVer = getSZ(abPayload, 0);
735 if sVer is not None:
736 rc = sVer;
737 else:
738 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
739 else:
740 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
741 return rc;
742
743 def taskUuid(self):
744 """Gets the TXS UUID"""
745 rc = self.sendMsg("UUID");
746 if rc is True:
747 rc = False;
748 cbMsg, sOpcode, abPayload = self.recvReply();
749 if cbMsg is not None:
750 sOpcode = sOpcode.strip()
751 if sOpcode == "ACK UUID":
752 sUuid = getSZ(abPayload, 0);
753 if sUuid is not None:
754 sUuid = '{%s}' % (sUuid,)
755 try:
756 _ = uuid.UUID(sUuid);
757 rc = sUuid;
758 except:
759 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
760 else:
761 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
762 else:
763 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
764 else:
765 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
766 return rc;
767
768 #
769 # Process task
770 # pylint: disable=missing-docstring
771 #
772
773 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
774 # Construct the payload.
775 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
776 for sArg in asArgs:
777 aoPayload.append('%s' % (sArg));
778 aoPayload.append(long(len(asAddEnv)));
779 for sPutEnv in asAddEnv:
780 aoPayload.append('%s' % (sPutEnv));
781 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
782 if utils.isString(o):
783 aoPayload.append(o);
784 elif o is not None:
785 aoPayload.append('|');
786 o.uTxsClientCrc32 = zlib.crc32(b'');
787 else:
788 aoPayload.append('');
789 aoPayload.append('%s' % (sAsUser));
790 aoPayload.append(long(self.cMsTimeout));
791
792 # Kick of the EXEC command.
793 rc = self.sendMsg('EXEC', aoPayload)
794 if rc is True:
795 rc = self.recvAckLogged('EXEC');
796 if rc is True:
797 # Loop till the process completes, feed input to the TXS and
798 # receive output from it.
799 sFailure = "";
800 msPendingInputReply = None;
801 cbMsg, sOpcode, abPayload = (None, None, None);
802 while True:
803 # Pending input?
804 if msPendingInputReply is None \
805 and oStdIn is not None \
806 and not utils.isString(oStdIn):
807 try:
808 sInput = oStdIn.read(65536);
809 except:
810 reporter.errorXcpt('read standard in');
811 sFailure = 'exception reading stdin';
812 rc = None;
813 break;
814 if sInput:
815 # Convert to a byte array before handing it of to sendMsg or the string
816 # will get some zero termination added breaking the CRC (and injecting
817 # unwanted bytes).
818 abInput = array.array('B', sInput.encode('utf-8'));
819 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
820 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
821 if rc is not True:
822 sFailure = 'sendMsg failure';
823 break;
824 msPendingInputReply = base.timestampMilli();
825 continue;
826
827 rc = self.sendMsg('STDINEOS');
828 oStdIn = None;
829 if rc is not True:
830 sFailure = 'sendMsg failure';
831 break;
832 msPendingInputReply = base.timestampMilli();
833
834 # Wait for input (500 ms timeout).
835 if cbMsg is None:
836 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
837 if cbMsg is None:
838 # Check for time out before restarting the loop.
839 # Note! Only doing timeout checking here does mean that
840 # the TXS may prevent us from timing out by
841 # flooding us with data. This is unlikely though.
842 if self.hasTimedOut() \
843 and ( msPendingInputReply is None \
844 or base.timestampMilli() - msPendingInputReply > 30000):
845 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
846 sFailure = 'timeout';
847 rc = None;
848 break;
849 # Check that the connection is OK.
850 if not self.oTransport.isConnectionOk():
851 self.oTransport.disconnect();
852 sFailure = 'disconnected';
853 rc = False;
854 break;
855 continue;
856
857 # Handle the response.
858 sOpcode = sOpcode.rstrip();
859 if sOpcode == 'STDOUT':
860 oOut = oStdOut;
861 elif sOpcode == 'STDERR':
862 oOut = oStdErr;
863 elif sOpcode == 'TESTPIPE':
864 oOut = oTestPipe;
865 else:
866 oOut = None;
867 if oOut is not None:
868 # Output from the process.
869 if len(abPayload) < 4:
870 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
871 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
872 rc = None;
873 break;
874 uStreamCrc32 = getU32(abPayload, 0);
875 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
876 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
877 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes: %s)' \
878 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg,
879 ' '.join(['%02x' % (b,) for b in abPayload]),);
880 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
881 rc = None;
882 break;
883 try:
884 oOut.write(abPayload[4:]);
885 except:
886 sFailure = 'exception writing %s' % (sOpcode);
887 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
888 rc = None;
889 break;
890 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
891 # Standard input is ignored. Ignore this condition for now.
892 msPendingInputReply = None;
893 reporter.log('taskExecEx: Standard input is ignored... why?');
894 del oStdIn.uTxsClientCrc32;
895 oStdIn = '/dev/null';
896 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',) and msPendingInputReply is not None:
897 # TXS STDIN error, abort.
898 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
899 msPendingInputReply = None;
900 sFailure = 'TXS is out of memory for std input buffering';
901 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
902 rc = None;
903 break;
904 elif sOpcode == 'ACK' and msPendingInputReply is not None:
905 msPendingInputReply = None;
906 elif sOpcode.startswith('PROC '):
907 # Process status message, handle it outside the loop.
908 rc = True;
909 break;
910 else:
911 sFailure = 'Unexpected opcode %s' % (sOpcode);
912 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
913 rc = None;
914 break;
915 # Clear the message.
916 cbMsg, sOpcode, abPayload = (None, None, None);
917
918 # If we sent an STDIN packet and didn't get a reply yet, we'll give
919 # TXS some 5 seconds to reply to this. If we don't wait here we'll
920 # get screwed later on if we mix it up with the reply to some other
921 # command. Hackish.
922 if msPendingInputReply is not None:
923 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
924 if cbMsg2 is not None:
925 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
926 % (cbMsg2, sOpcode2, abPayload2));
927 msPendingInputReply = None;
928 else:
929 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
930 self.fScrewedUpMsgState = True;
931
932 # Parse the exit status (True), abort (None) or do nothing (False).
933 if rc is True:
934 if sOpcode == 'PROC OK':
935 pass;
936 else:
937 rc = False;
938 # Do proper parsing some other day if needed:
939 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
940 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
941 if sOpcode == 'PROC DOO':
942 reporter.log('taskExecEx: PROC DOO[FUS]: %s' % (abPayload,));
943 elif sOpcode.startswith('PROC NOK'):
944 reporter.log('taskExecEx: PROC NOK: rcExit=%s' % (abPayload,));
945 elif abPayload and sOpcode.startswith('PROC '):
946 reporter.log('taskExecEx: %s payload=%s' % (sOpcode, abPayload,));
947
948 else:
949 if rc is None:
950 # Abort it.
951 reporter.log('taskExecEx: sending ABORT...');
952 rc = self.sendMsg('ABORT');
953 while rc is True:
954 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
955 if cbMsg is None:
956 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
957 self.fScrewedUpMsgState = True;
958 break;
959 if sOpcode.startswith('PROC '):
960 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
961 break;
962 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
963 # Check that the connection is OK before looping.
964 if not self.oTransport.isConnectionOk():
965 self.oTransport.disconnect();
966 break;
967
968 # Fake response with the reason why we quit.
969 if sFailure is not None:
970 self.t3oReply = (0, 'EXECFAIL', sFailure);
971 rc = None;
972 else:
973 rc = None;
974
975 # Cleanup.
976 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
977 if o is not None and not utils.isString(o):
978 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
979 # Make sure all files are closed
980 o.close(); # pylint: disable=maybe-no-member
981 reporter.log('taskExecEx: returns %s' % (rc));
982 return rc;
983
984 #
985 # Admin tasks
986 #
987
988 def hlpRebootShutdownWaitForAck(self, sCmd):
989 """Wait for reboot/shutodwn ACK."""
990 rc = self.recvAckLogged(sCmd);
991 if rc is True:
992 # poll a little while for server to disconnect.
993 uMsStart = base.timestampMilli();
994 while self.oTransport.isConnectionOk() \
995 and base.timestampMilli() - uMsStart >= 5000:
996 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
997 break;
998 self.oTransport.disconnect();
999 return rc;
1000
1001 def taskReboot(self):
1002 rc = self.sendMsg('REBOOT');
1003 if rc is True:
1004 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
1005 return rc;
1006
1007 def taskShutdown(self):
1008 rc = self.sendMsg('SHUTDOWN');
1009 if rc is True:
1010 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
1011 return rc;
1012
1013 #
1014 # CD/DVD control tasks.
1015 #
1016
1017 ## TODO
1018
1019 #
1020 # File system tasks
1021 #
1022
1023 def taskMkDir(self, sRemoteDir, fMode):
1024 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1025 if rc is True:
1026 rc = self.recvAckLogged('MKDIR');
1027 return rc;
1028
1029 def taskMkDirPath(self, sRemoteDir, fMode):
1030 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1031 if rc is True:
1032 rc = self.recvAckLogged('MKDRPATH');
1033 return rc;
1034
1035 def taskMkSymlink(self, sLinkTarget, sLink):
1036 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1037 if rc is True:
1038 rc = self.recvAckLogged('MKSYMLNK');
1039 return rc;
1040
1041 def taskRmDir(self, sRemoteDir):
1042 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1043 if rc is True:
1044 rc = self.recvAckLogged('RMDIR');
1045 return rc;
1046
1047 def taskRmFile(self, sRemoteFile):
1048 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1049 if rc is True:
1050 rc = self.recvAckLogged('RMFILE');
1051 return rc;
1052
1053 def taskRmSymlink(self, sRemoteSymlink):
1054 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1055 if rc is True:
1056 rc = self.recvAckLogged('RMSYMLNK');
1057 return rc;
1058
1059 def taskRmTree(self, sRemoteTree):
1060 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1061 if rc is True:
1062 rc = self.recvAckLogged('RMTREE');
1063 return rc;
1064
1065 def taskChMod(self, sRemotePath, fMode):
1066 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1067 if rc is True:
1068 rc = self.recvAckLogged('CHMOD');
1069 return rc;
1070
1071 def taskChOwn(self, sRemotePath, idUser, idGroup):
1072 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1073 if rc is True:
1074 rc = self.recvAckLogged('CHOWN');
1075 return rc;
1076
1077 def taskIsDir(self, sRemoteDir):
1078 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1079 if rc is True:
1080 rc = self.recvTrueFalse('ISDIR');
1081 return rc;
1082
1083 def taskIsFile(self, sRemoteFile):
1084 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1085 if rc is True:
1086 rc = self.recvTrueFalse('ISFILE');
1087 return rc;
1088
1089 def taskIsSymlink(self, sRemoteSymlink):
1090 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1091 if rc is True:
1092 rc = self.recvTrueFalse('ISSYMLNK');
1093 return rc;
1094
1095 #def "STAT "
1096 #def "LSTAT "
1097 #def "LIST "
1098
1099 def taskCopyFile(self, sSrcFile, sDstFile, fMode, fFallbackOkay):
1100 """ Copies a file within the remote from source to destination. """
1101 _ = fFallbackOkay; # Not used yet.
1102 # Note: If fMode is set to 0, it's up to the target OS' implementation with
1103 # what a file mode the destination file gets created (i.e. via umask).
1104 rc = self.sendMsg('CPFILE', (int(fMode), sSrcFile, sDstFile,));
1105 if rc is True:
1106 rc = self.recvAckLogged('CPFILE');
1107 return rc;
1108
1109 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1110 #
1111 # Open the local file (make sure it exist before bothering TXS) and
1112 # tell TXS that we want to upload a file.
1113 #
1114 try:
1115 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1116 except:
1117 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1118 return False;
1119
1120 # Common cause with taskUploadStr
1121 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1122
1123 # Cleanup.
1124 oLocalFile.close();
1125 return rc;
1126
1127 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1128 # Wrap sContent in a file like class.
1129 class InStringFile(object): # pylint: disable=too-few-public-methods
1130 def __init__(self, sContent):
1131 self.sContent = sContent;
1132 self.off = 0;
1133
1134 def read(self, cbMax):
1135 cbLeft = len(self.sContent) - self.off;
1136 if cbLeft == 0:
1137 return "";
1138 if cbLeft <= cbMax:
1139 sRet = self.sContent[self.off:(self.off + cbLeft)];
1140 else:
1141 sRet = self.sContent[self.off:(self.off + cbMax)];
1142 self.off = self.off + len(sRet);
1143 return sRet;
1144
1145 oLocalString = InStringFile(sContent);
1146 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1147
1148 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1149 """Common worker used by taskUploadFile and taskUploadString."""
1150 #
1151 # Command + ACK.
1152 #
1153 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1154 # Fall back on the old command if the new one is not known by the TXS.
1155 #
1156 if fMode == 0:
1157 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1158 if rc is True:
1159 rc = self.recvAckLogged('PUT FILE');
1160 else:
1161 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1162 if rc is True:
1163 rc = self.recvAck();
1164 if rc is False:
1165 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1166 elif rc is not True:
1167 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1168 # Fallback:
1169 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1170 if rc is True:
1171 rc = self.recvAckLogged('PUT FILE');
1172 else:
1173 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1174 rc = False;
1175 if rc is True:
1176 #
1177 # Push data packets until eof.
1178 #
1179 uMyCrc32 = zlib.crc32(b'');
1180 while True:
1181 # Read up to 64 KB of data.
1182 try:
1183 sRaw = oLocalFile.read(65536);
1184 except:
1185 rc = None;
1186 break;
1187
1188 # Convert to array - this is silly!
1189 abBuf = array.array('B');
1190 if utils.isString(sRaw):
1191 for ch in sRaw:
1192 abBuf.append(ord(ch));
1193 else:
1194 abBuf.extend(sRaw);
1195 sRaw = None;
1196
1197 # Update the file stream CRC and send it off.
1198 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1199 if not abBuf:
1200 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1201 else:
1202 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1203 if rc is False:
1204 break;
1205
1206 # Wait for the reply.
1207 rc = self.recvAck();
1208 if rc is not True:
1209 if rc is False:
1210 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1211 else:
1212 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1213 rc = False;
1214 break;
1215
1216 # EOF?
1217 if not abBuf:
1218 break;
1219
1220 # Send ABORT on ACK and I/O errors.
1221 if rc is None:
1222 rc = self.sendMsg('ABORT');
1223 if rc is True:
1224 self.recvAckLogged('ABORT');
1225 rc = False;
1226 return rc;
1227
1228 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1229 try:
1230 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1231 except:
1232 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1233 return False;
1234
1235 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1236
1237 oLocalFile.close();
1238 if rc is False:
1239 try:
1240 os.remove(sLocalFile);
1241 except:
1242 reporter.errorXcpt();
1243 return rc;
1244
1245 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1246 # Wrap sContent in a file like class.
1247 class OutStringFile(object): # pylint: disable=too-few-public-methods
1248 def __init__(self):
1249 self.asContent = [];
1250
1251 def write(self, sBuf):
1252 self.asContent.append(sBuf);
1253 return None;
1254
1255 oLocalString = OutStringFile();
1256 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1257 if rc is True:
1258 rc = '';
1259 for sBuf in oLocalString.asContent:
1260 if hasattr(sBuf, 'decode'):
1261 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1262 else:
1263 rc += sBuf;
1264 return rc;
1265
1266 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1267 """Common worker for taskDownloadFile and taskDownloadString."""
1268 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1269 if rc is True:
1270 #
1271 # Process data packets until eof.
1272 #
1273 uMyCrc32 = zlib.crc32(b'');
1274 while rc is True:
1275 cbMsg, sOpcode, abPayload = self.recvReply();
1276 if cbMsg is None:
1277 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1278 rc = None;
1279 break;
1280
1281 # Validate.
1282 sOpcode = sOpcode.rstrip();
1283 if sOpcode not in ('DATA', 'DATA EOF',):
1284 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1285 % (sOpcode, getSZ(abPayload, 0, "None")));
1286 rc = False;
1287 break;
1288 if sOpcode == 'DATA' and len(abPayload) < 4:
1289 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1290 rc = None;
1291 break;
1292 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1293 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1294 rc = None;
1295 break;
1296
1297 # Check the CRC (common for both packets).
1298 uCrc32 = getU32(abPayload, 0);
1299 if sOpcode == 'DATA':
1300 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1301 if uCrc32 != (uMyCrc32 & 0xffffffff):
1302 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1303 % (hex(uMyCrc32), hex(uCrc32)));
1304 rc = None;
1305 break;
1306 if sOpcode == 'DATA EOF':
1307 rc = self.sendMsg('ACK');
1308 break;
1309
1310 # Finally, push the data to the file.
1311 try:
1312 if sys.version_info < (3, 9, 0):
1313 # Removed since Python 3.9.
1314 abData = abPayload[4:].tostring();
1315 else:
1316 abData = abPayload[4:].tobytes();
1317 oLocalFile.write(abData);
1318 except:
1319 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1320 rc = None;
1321 break;
1322 rc = self.sendMsg('ACK');
1323
1324 # Send NACK on validation and I/O errors.
1325 if rc is None:
1326 rc = self.sendMsg('NACK');
1327 rc = False;
1328 return rc;
1329
1330 def taskPackFile(self, sRemoteFile, sRemoteSource):
1331 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1332 if rc is True:
1333 rc = self.recvAckLogged('PKFILE');
1334 return rc;
1335
1336 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1337 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1338 if rc is True:
1339 rc = self.recvAckLogged('UNPKFILE');
1340 return rc;
1341
1342 def taskExpandString(self, sString):
1343 rc = self.sendMsg('EXP STR ', (sString,));
1344 if rc is True:
1345 rc = False;
1346 cbMsg, sOpcode, abPayload = self.recvReply();
1347 if cbMsg is not None:
1348 sOpcode = sOpcode.strip();
1349 if sOpcode == "STRING":
1350 sStringExp = getSZ(abPayload, 0);
1351 if sStringExp is not None:
1352 rc = sStringExp;
1353 else: # Also handles SHORTSTR reply (not enough space to store result).
1354 reporter.maybeErr(self.fErr, 'taskExpandString got a bad reply: %s' % (sOpcode,));
1355 else:
1356 reporter.maybeErr(self.fErr, 'taskExpandString got 3xNone from recvReply.');
1357 return rc;
1358
1359 # pylint: enable=missing-docstring
1360
1361
1362 #
1363 # Public methods - generic task queries
1364 #
1365
1366 def isSuccess(self):
1367 """Returns True if the task completed successfully, otherwise False."""
1368 self.lockTask();
1369 sStatus = self.sStatus;
1370 oTaskRc = self.oTaskRc;
1371 self.unlockTask();
1372 if sStatus != "":
1373 return False;
1374 if oTaskRc is False or oTaskRc is None:
1375 return False;
1376 return True;
1377
1378 def getResult(self):
1379 """
1380 Returns the result of a completed task.
1381 Returns None if not completed yet or no previous task.
1382 """
1383 self.lockTask();
1384 sStatus = self.sStatus;
1385 oTaskRc = self.oTaskRc;
1386 self.unlockTask();
1387 if sStatus != "":
1388 return None;
1389 return oTaskRc;
1390
1391 def getLastReply(self):
1392 """
1393 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1394 Returns a None, None, None three-tuple if there was no last reply.
1395 """
1396 self.lockTask();
1397 t3oReply = self.t3oReply;
1398 self.unlockTask();
1399 return t3oReply;
1400
1401 #
1402 # Public methods - connection.
1403 #
1404
1405 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1406 """
1407 Initiates a disconnect task.
1408
1409 Returns True on success, False on failure (logged).
1410
1411 The task returns True on success and False on failure.
1412 """
1413 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1414
1415 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1416 """Synchronous version."""
1417 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1418
1419 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1420 """
1421 Initiates a task for getting the TXS version information.
1422
1423 Returns True on success, False on failure (logged).
1424
1425 The task returns the version string on success and False on failure.
1426 """
1427 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1428
1429 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1430 """Synchronous version."""
1431 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1432
1433 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1434 """
1435 Initiates a task for getting the TXS UUID.
1436
1437 Returns True on success, False on failure (logged).
1438
1439 The task returns UUID string (in {}) on success and False on failure.
1440 """
1441 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1442
1443 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1444 """Synchronous version."""
1445 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1446
1447 #
1448 # Public methods - execution.
1449 #
1450
1451 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1452 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1453 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1454 """
1455 Initiates a exec process task.
1456
1457 Returns True on success, False on failure (logged).
1458
1459 The task returns True if the process exited normally with status code 0.
1460 The task returns None if on failure prior to executing the process, and
1461 False if the process exited with a different status or in an abnormal
1462 manner. Both None and False are logged of course and further info can
1463 also be obtained by getLastReply().
1464
1465 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1466 these streams. If None, no special action is taken and the output goes
1467 to where ever the TXS sends its output, and ditto for input.
1468 - To send to / read from the bitbucket, pass '/dev/null'.
1469 - To redirect to/from a file, just specify the remote filename.
1470 - To append to a file use '>>' followed by the remote filename.
1471 - To pipe the stream to/from the TXS, specify a file like
1472 object. For StdIn a non-blocking read() method is required. For
1473 the other a write() method is required. Watch out for deadlock
1474 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1475 """
1476 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1477 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1478 oStdOut, oStdErr, oTestPipe, sAsUser));
1479
1480 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1481 oStdIn = '/dev/null', oStdOut = '/dev/null',
1482 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1483 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1484 """Synchronous version."""
1485 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1486 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1487
1488 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1489 cMsTimeout = 3600000, fIgnoreErrors = False):
1490 """
1491 Initiates a exec process test task.
1492
1493 Returns True on success, False on failure (logged).
1494
1495 The task returns True if the process exited normally with status code 0.
1496 The task returns None if on failure prior to executing the process, and
1497 False if the process exited with a different status or in an abnormal
1498 manner. Both None and False are logged of course and further info can
1499 also be obtained by getLastReply().
1500
1501 Standard in is taken from /dev/null. While both standard output and
1502 standard error goes directly to reporter.log(). The testpipe is piped
1503 to reporter.xxxx.
1504 """
1505
1506 sStdIn = '/dev/null';
1507 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1508 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1509 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1510 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1511
1512 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1513 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1514
1515 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1516 cMsTimeout = 3600000, fIgnoreErrors = False):
1517 """Synchronous version."""
1518 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1519 cMsTimeout, fIgnoreErrors);
1520
1521 #
1522 # Public methods - system
1523 #
1524
1525 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1526 """
1527 Initiates a reboot task.
1528
1529 Returns True on success, False on failure (logged).
1530
1531 The task returns True on success, False on failure (logged). The
1532 session will be disconnected on successful task completion.
1533 """
1534 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1535
1536 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1537 """Synchronous version."""
1538 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1539
1540 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1541 """
1542 Initiates a shutdown task.
1543
1544 Returns True on success, False on failure (logged).
1545
1546 The task returns True on success, False on failure (logged).
1547 """
1548 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1549
1550 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1551 """Synchronous version."""
1552 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1553
1554
1555 #
1556 # Public methods - file system
1557 #
1558
1559 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """
1561 Initiates a mkdir task.
1562
1563 Returns True on success, False on failure (logged).
1564
1565 The task returns True on success, False on failure (logged).
1566 """
1567 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1568
1569 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1570 """Synchronous version."""
1571 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1572
1573 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """
1575 Initiates a mkdir -p task.
1576
1577 Returns True on success, False on failure (logged).
1578
1579 The task returns True on success, False on failure (logged).
1580 """
1581 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1582
1583 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1584 """Synchronous version."""
1585 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1586
1587 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """
1589 Initiates a symlink task.
1590
1591 Returns True on success, False on failure (logged).
1592
1593 The task returns True on success, False on failure (logged).
1594 """
1595 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1596
1597 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1598 """Synchronous version."""
1599 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1600
1601 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """
1603 Initiates a rmdir task.
1604
1605 Returns True on success, False on failure (logged).
1606
1607 The task returns True on success, False on failure (logged).
1608 """
1609 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1610
1611 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """Synchronous version."""
1613 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1614
1615 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1616 """
1617 Initiates a rmfile task.
1618
1619 Returns True on success, False on failure (logged).
1620
1621 The task returns True on success, False on failure (logged).
1622 """
1623 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1624
1625 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1626 """Synchronous version."""
1627 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1628
1629 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1630 """
1631 Initiates a rmsymlink task.
1632
1633 Returns True on success, False on failure (logged).
1634
1635 The task returns True on success, False on failure (logged).
1636 """
1637 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1638
1639 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1640 """Synchronous version."""
1641 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1642
1643 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1644 """
1645 Initiates a rmtree task.
1646
1647 Returns True on success, False on failure (logged).
1648
1649 The task returns True on success, False on failure (logged).
1650 """
1651 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1652
1653 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1654 """Synchronous version."""
1655 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1656
1657 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1658 """
1659 Initiates a chmod task.
1660
1661 Returns True on success, False on failure (logged).
1662
1663 The task returns True on success, False on failure (logged).
1664 """
1665 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1666
1667 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1668 """Synchronous version."""
1669 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1670
1671 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1672 """
1673 Initiates a chown task.
1674
1675 Returns True on success, False on failure (logged).
1676
1677 The task returns True on success, False on failure (logged).
1678 """
1679 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1680
1681 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1682 """Synchronous version."""
1683 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1684
1685 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1686 """
1687 Initiates a is-dir query task.
1688
1689 Returns True on success, False on failure (logged).
1690
1691 The task returns True if it's a directory, False if it isn't, and
1692 None on error (logged).
1693 """
1694 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1695
1696 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1697 """Synchronous version."""
1698 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1699
1700 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1701 """
1702 Initiates a is-file query task.
1703
1704 Returns True on success, False on failure (logged).
1705
1706 The task returns True if it's a file, False if it isn't, and None on
1707 error (logged).
1708 """
1709 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1710
1711 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1712 """Synchronous version."""
1713 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1714
1715 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1716 """
1717 Initiates a is-symbolic-link query task.
1718
1719 Returns True on success, False on failure (logged).
1720
1721 The task returns True if it's a symbolic linke, False if it isn't, and
1722 None on error (logged).
1723 """
1724 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1725
1726 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1727 """Synchronous version."""
1728 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1729
1730 #def "STAT "
1731 #def "LSTAT "
1732 #def "LIST "
1733
1734 @staticmethod
1735 def calcFileXferTimeout(cbFile):
1736 """
1737 Calculates a reasonable timeout for an upload/download given the file size.
1738
1739 Returns timeout in milliseconds.
1740 """
1741 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1742
1743 @staticmethod
1744 def calcUploadTimeout(sLocalFile):
1745 """
1746 Calculates a reasonable timeout for an upload given the file (will stat it).
1747
1748 Returns timeout in milliseconds.
1749 """
1750 try: cbFile = os.path.getsize(sLocalFile);
1751 except: cbFile = 1024*1024;
1752 return Session.calcFileXferTimeout(cbFile);
1753
1754 def asyncCopyFile(self, sSrcFile, sDstFile,
1755 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1756 """
1757 Initiates a file copying task on the remote.
1758
1759 Returns True on success, False on failure (logged).
1760
1761 The task returns True on success, False on failure (logged).
1762 """
1763 return self.startTask(cMsTimeout, fIgnoreErrors, "cpfile",
1764 self.taskCopyFile, (sSrcFile, sDstFile, fMode, fFallbackOkay));
1765
1766 def syncCopyFile(self, sSrcFile, sDstFile, fMode = 0, cMsTimeout = 30000, fIgnoreErrors = False):
1767 """Synchronous version."""
1768 return self.asyncToSync(self.asyncCopyFile, sSrcFile, sDstFile, fMode, cMsTimeout, fIgnoreErrors);
1769
1770 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1771 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1772 """
1773 Initiates a download query task.
1774
1775 Returns True on success, False on failure (logged).
1776
1777 The task returns True on success, False on failure (logged).
1778 """
1779 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1780 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1781
1782 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1783 """Synchronous version."""
1784 if cMsTimeout <= 0:
1785 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1786 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1787
1788 def asyncUploadString(self, sContent, sRemoteFile,
1789 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1790 """
1791 Initiates a upload string task.
1792
1793 Returns True on success, False on failure (logged).
1794
1795 The task returns True on success, False on failure (logged).
1796 """
1797 if cMsTimeout <= 0:
1798 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1799 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1800 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1801
1802 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1803 """Synchronous version."""
1804 if cMsTimeout <= 0:
1805 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1806 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1807
1808 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1809 """
1810 Initiates a download file task.
1811
1812 Returns True on success, False on failure (logged).
1813
1814 The task returns True on success, False on failure (logged).
1815 """
1816 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1817
1818 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1819 """Synchronous version."""
1820 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1821
1822 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1823 cMsTimeout = 30000, fIgnoreErrors = False):
1824 """
1825 Initiates a download string task.
1826
1827 Returns True on success, False on failure (logged).
1828
1829 The task returns a byte string on success, False on failure (logged).
1830 """
1831 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1832 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1833
1834 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1835 cMsTimeout = 30000, fIgnoreErrors = False):
1836 """Synchronous version."""
1837 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1838 cMsTimeout, fIgnoreErrors);
1839
1840 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1841 """
1842 Initiates a packing file/directory task.
1843
1844 Returns True on success, False on failure (logged).
1845
1846 The task returns True on success, False on failure (logged).
1847 """
1848 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1849 (sRemoteFile, sRemoteSource));
1850
1851 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1852 """Synchronous version."""
1853 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1854
1855 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1856 """
1857 Initiates a unpack file task.
1858
1859 Returns True on success, False on failure (logged).
1860
1861 The task returns True on success, False on failure (logged).
1862 """
1863 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1864 (sRemoteFile, sRemoteDir));
1865
1866 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1867 """Synchronous version."""
1868 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1869
1870 def asyncExpandString(self, sString, cMsTimeout = 120000, fIgnoreErrors = False):
1871 """
1872 Initiates an expand string task.
1873
1874 Returns expanded string on success, False on failure (logged).
1875
1876 The task returns True on success, False on failure (logged).
1877 """
1878 return self.startTask(cMsTimeout, fIgnoreErrors, "expandString",
1879 self.taskExpandString, (sString,));
1880
1881 def syncExpandString(self, sString, cMsTimeout = 120000, fIgnoreErrors = False):
1882 """Synchronous version."""
1883 return self.asyncToSync(self.asyncExpandString, sString, cMsTimeout, fIgnoreErrors);
1884
1885
1886class TransportTcp(TransportBase):
1887 """
1888 TCP transport layer for the TXS client session class.
1889 """
1890
1891 def __init__(self, sHostname, uPort, fReversedSetup):
1892 """
1893 Save the parameters. The session will call us back to make the
1894 connection later on its worker thread.
1895 """
1896 TransportBase.__init__(self, utils.getCallerName());
1897 self.sHostname = sHostname;
1898 self.fReversedSetup = fReversedSetup;
1899 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1900 self.oSocket = None;
1901 self.oWakeupW = None;
1902 self.oWakeupR = None;
1903 self.fConnectCanceled = False;
1904 self.fIsConnecting = False;
1905 self.oCv = threading.Condition();
1906 self.abReadAhead = array.array('B');
1907
1908 def toString(self):
1909 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1910 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1911 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1912 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1913
1914 def __isInProgressXcpt(self, oXcpt):
1915 """ In progress exception? """
1916 try:
1917 if isinstance(oXcpt, socket.error):
1918 try:
1919 if oXcpt.errno == errno.EINPROGRESS:
1920 return True;
1921 except: pass;
1922 # Windows?
1923 try:
1924 if oXcpt.errno == errno.EWOULDBLOCK:
1925 return True;
1926 except: pass;
1927 except:
1928 pass;
1929 return False;
1930
1931 def __isWouldBlockXcpt(self, oXcpt):
1932 """ Would block exception? """
1933 try:
1934 if isinstance(oXcpt, socket.error):
1935 try:
1936 if oXcpt.errno == errno.EWOULDBLOCK:
1937 return True;
1938 except: pass;
1939 try:
1940 if oXcpt.errno == errno.EAGAIN:
1941 return True;
1942 except: pass;
1943 except:
1944 pass;
1945 return False;
1946
1947 def __isConnectionReset(self, oXcpt):
1948 """ Connection reset by Peer or others. """
1949 try:
1950 if isinstance(oXcpt, socket.error):
1951 try:
1952 if oXcpt.errno == errno.ECONNRESET:
1953 return True;
1954 except: pass;
1955 try:
1956 if oXcpt.errno == errno.ENETRESET:
1957 return True;
1958 except: pass;
1959 except:
1960 pass;
1961 return False;
1962
1963 def _closeWakeupSockets(self):
1964 """ Closes the wakup sockets. Caller should own the CV. """
1965 oWakeupR = self.oWakeupR;
1966 self.oWakeupR = None;
1967 if oWakeupR is not None:
1968 oWakeupR.close();
1969
1970 oWakeupW = self.oWakeupW;
1971 self.oWakeupW = None;
1972 if oWakeupW is not None:
1973 oWakeupW.close();
1974
1975 return None;
1976
1977 def cancelConnect(self):
1978 # This is bad stuff.
1979 self.oCv.acquire();
1980 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1981 self.fConnectCanceled = True;
1982 if self.fIsConnecting:
1983 oSocket = self.oSocket;
1984 self.oSocket = None;
1985 if oSocket is not None:
1986 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1987 oSocket.close();
1988
1989 oWakeupW = self.oWakeupW;
1990 self.oWakeupW = None;
1991 if oWakeupW is not None:
1992 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1993 try: oWakeupW.send(b'cancelled!\n');
1994 except: reporter.logXcpt();
1995 try: oWakeupW.shutdown(socket.SHUT_WR);
1996 except: reporter.logXcpt();
1997 oWakeupW.close();
1998 self.oCv.release();
1999
2000 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
2001 """ Connects to the TXS server as server, i.e. the reversed setup. """
2002 assert(self.fReversedSetup);
2003
2004 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
2005
2006 # Workaround for bind() failure...
2007 try:
2008 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
2009 except:
2010 reporter.errorXcpt('socket.listen(1) failed');
2011 return None;
2012
2013 # Bind the socket and make it listen.
2014 try:
2015 oSocket.bind((self.sHostname, self.uPort));
2016 except:
2017 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
2018 return None;
2019 try:
2020 oSocket.listen(1);
2021 except:
2022 reporter.errorXcpt('socket.listen(1) failed');
2023 return None;
2024
2025 # Accept connections.
2026 oClientSocket = None;
2027 tClientAddr = None;
2028 try:
2029 (oClientSocket, tClientAddr) = oSocket.accept();
2030 except socket.error as e:
2031 if not self.__isInProgressXcpt(e):
2032 raise;
2033
2034 # Do the actual waiting.
2035 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
2036 try:
2037 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2038 except socket.error as oXctp:
2039 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
2040 raise;
2041 reporter.log('socket.select() on accept was canceled');
2042 return None;
2043 except:
2044 reporter.logXcpt('socket.select() on accept');
2045
2046 # Try accept again.
2047 try:
2048 (oClientSocket, tClientAddr) = oSocket.accept();
2049 except socket.error as oXcpt:
2050 if not self.__isInProgressXcpt(e):
2051 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
2052 raise;
2053 reporter.log('socket.accept() was canceled');
2054 return None;
2055 reporter.log('socket.accept() timed out');
2056 return False;
2057 except:
2058 reporter.errorXcpt('socket.accept() failed');
2059 return None;
2060 except:
2061 reporter.errorXcpt('socket.accept() failed');
2062 return None;
2063
2064 # Store the connected socket and throw away the server socket.
2065 self.oCv.acquire();
2066 if not self.fConnectCanceled:
2067 self.oSocket.close();
2068 self.oSocket = oClientSocket;
2069 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
2070 self.oCv.release();
2071 return True;
2072
2073 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
2074 """ Connects to the TXS server as client. """
2075 assert(not self.fReversedSetup);
2076
2077 # Connect w/ timeouts.
2078 rc = None;
2079 try:
2080 oSocket.connect((self.sHostname, self.uPort));
2081 rc = True;
2082 except socket.error as oXcpt:
2083 iRc = oXcpt.errno;
2084 if self.__isInProgressXcpt(oXcpt):
2085 # Do the actual waiting.
2086 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2087 try:
2088 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2089 if len(ttRc[1]) + len(ttRc[2]) == 0:
2090 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2091 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2092 rc = iRc == 0;
2093 except socket.error as oXcpt2:
2094 iRc = oXcpt2.errno;
2095 except:
2096 iRc = -42;
2097 reporter.fatalXcpt('socket.select() on connect failed');
2098
2099 if rc is True:
2100 pass;
2101 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2102 rc = False; # try again.
2103 else:
2104 if iRc != errno.EBADF or not self.fConnectCanceled:
2105 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2106 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2107 except:
2108 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2109 return rc;
2110
2111
2112 def connect(self, cMsTimeout):
2113 # Create a non-blocking socket.
2114 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2115 try:
2116 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2117 except:
2118 reporter.fatalXcpt('socket.socket() failed');
2119 return None;
2120 try:
2121 oSocket.setblocking(0);
2122 except:
2123 oSocket.close();
2124 reporter.fatalXcpt('socket.socket() failed');
2125 return None;
2126
2127 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2128 oWakeupR = None;
2129 oWakeupW = None;
2130 if hasattr(socket, 'socketpair'):
2131 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2132 except: reporter.logXcpt('socket.socketpair() failed');
2133
2134 # Update the state.
2135 self.oCv.acquire();
2136 rc = None;
2137 if not self.fConnectCanceled:
2138 self.oSocket = oSocket;
2139 self.oWakeupW = oWakeupW;
2140 self.oWakeupR = oWakeupR;
2141 self.fIsConnecting = True;
2142 self.oCv.release();
2143
2144 # Try connect.
2145 if oWakeupR is None:
2146 oWakeupR = oSocket; # Avoid select failure.
2147 if self.fReversedSetup:
2148 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2149 else:
2150 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2151 oSocket = None;
2152
2153 # Update the state and cleanup on failure/cancel.
2154 self.oCv.acquire();
2155 if rc is True and self.fConnectCanceled:
2156 rc = False;
2157 self.fIsConnecting = False;
2158
2159 if rc is not True:
2160 if self.oSocket is not None:
2161 self.oSocket.close();
2162 self.oSocket = None;
2163 self._closeWakeupSockets();
2164 self.oCv.release();
2165
2166 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2167 return rc;
2168
2169 def disconnect(self, fQuiet = False):
2170 if self.oSocket is not None:
2171 self.abReadAhead = array.array('B');
2172
2173 # Try a shutting down the socket gracefully (draining it).
2174 try:
2175 self.oSocket.shutdown(socket.SHUT_WR);
2176 except:
2177 if not fQuiet:
2178 reporter.error('shutdown(SHUT_WR)');
2179 try:
2180 self.oSocket.setblocking(0); # just in case it's not set.
2181 sData = "1";
2182 while sData:
2183 sData = self.oSocket.recv(16384);
2184 except:
2185 pass;
2186
2187 # Close it.
2188 self.oCv.acquire();
2189 try: self.oSocket.setblocking(1);
2190 except: pass;
2191 self.oSocket.close();
2192 self.oSocket = None;
2193 else:
2194 self.oCv.acquire();
2195 self._closeWakeupSockets();
2196 self.oCv.release();
2197
2198 def sendBytes(self, abBuf, cMsTimeout):
2199 if self.oSocket is None:
2200 reporter.error('TransportTcp.sendBytes: No connection.');
2201 return False;
2202
2203 # Try send it all.
2204 try:
2205 cbSent = self.oSocket.send(abBuf);
2206 if cbSent == len(abBuf):
2207 return True;
2208 except Exception as oXcpt:
2209 if not self.__isWouldBlockXcpt(oXcpt):
2210 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2211 return False;
2212 cbSent = 0;
2213
2214 # Do a timed send.
2215 msStart = base.timestampMilli();
2216 while True:
2217 cMsElapsed = base.timestampMilli() - msStart;
2218 if cMsElapsed > cMsTimeout:
2219 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2220 break;
2221
2222 # wait.
2223 try:
2224 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2225 if ttRc[2] and not ttRc[1]:
2226 reporter.error('TranportTcp.sendBytes: select returned with exception');
2227 break;
2228 if not ttRc[1]:
2229 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2230 break;
2231 except:
2232 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2233 break;
2234
2235 # Try send more.
2236 try:
2237 cbSent += self.oSocket.send(abBuf[cbSent:]);
2238 if cbSent == len(abBuf):
2239 return True;
2240 except Exception as oXcpt:
2241 if not self.__isWouldBlockXcpt(oXcpt):
2242 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2243 break;
2244
2245 return False;
2246
2247 def __returnReadAheadBytes(self, cb):
2248 """ Internal worker for recvBytes. """
2249 assert(len(self.abReadAhead) >= cb);
2250 abRet = self.abReadAhead[:cb];
2251 self.abReadAhead = self.abReadAhead[cb:];
2252 return abRet;
2253
2254 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2255 if self.oSocket is None:
2256 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2257 return None;
2258
2259 # Try read in some more data without bothering with timeout handling first.
2260 if len(self.abReadAhead) < cb:
2261 try:
2262 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2263 if abBuf:
2264 self.abReadAhead.extend(array.array('B', abBuf));
2265 except Exception as oXcpt:
2266 if not self.__isWouldBlockXcpt(oXcpt):
2267 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2268 return None;
2269
2270 if len(self.abReadAhead) >= cb:
2271 return self.__returnReadAheadBytes(cb);
2272
2273 # Timeout loop.
2274 msStart = base.timestampMilli();
2275 while True:
2276 cMsElapsed = base.timestampMilli() - msStart;
2277 if cMsElapsed > cMsTimeout:
2278 if not fNoDataOk or self.abReadAhead:
2279 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2280 break;
2281
2282 # Wait.
2283 try:
2284 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2285 if ttRc[2] and not ttRc[0]:
2286 reporter.error('TranportTcp.recvBytes: select returned with exception');
2287 break;
2288 if not ttRc[0]:
2289 if not fNoDataOk or self.abReadAhead:
2290 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2291 % (len(self.abReadAhead), cb, fNoDataOk));
2292 break;
2293 except:
2294 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2295 break;
2296
2297 # Try read more.
2298 try:
2299 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2300 if not abBuf:
2301 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2302 % (len(self.abReadAhead), cb, fNoDataOk));
2303 self.disconnect();
2304 return None;
2305
2306 self.abReadAhead.extend(array.array('B', abBuf));
2307
2308 except Exception as oXcpt:
2309 reporter.log('recv => exception %s' % (oXcpt,));
2310 if not self.__isWouldBlockXcpt(oXcpt):
2311 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2312 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2313 break;
2314
2315 # Done?
2316 if len(self.abReadAhead) >= cb:
2317 return self.__returnReadAheadBytes(cb);
2318
2319 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2320 return None;
2321
2322 def isConnectionOk(self):
2323 if self.oSocket is None:
2324 return False;
2325 try:
2326 ttRc = select.select([], [], [self.oSocket], 0.0);
2327 if ttRc[2]:
2328 return False;
2329
2330 self.oSocket.send(array.array('B')); # send zero bytes.
2331 except:
2332 return False;
2333 return True;
2334
2335 def isRecvPending(self, cMsTimeout = 0):
2336 try:
2337 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2338 if not ttRc[0]:
2339 return False;
2340 except:
2341 pass;
2342 return True;
2343
2344
2345def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2346 """
2347 Opens a connection to a Test Execution Service via TCP, given its name.
2348
2349 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2350 or similar.
2351 """
2352 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2353 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2354 try:
2355 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2356 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2357 except:
2358 reporter.errorXcpt(None, 15);
2359 return None;
2360 return oSession;
2361
2362
2363def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2364 """
2365 Tries to open a connection to a Test Execution Service via TCP, given its name.
2366
2367 This differs from openTcpSession in that it won't log a connection failure
2368 as an error.
2369 """
2370 try:
2371 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2372 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2373 except:
2374 reporter.errorXcpt(None, 15);
2375 return None;
2376 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette