VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 94526

Last change on this file since 94526 was 94526, checked in by vboxsync, 3 years ago

Validation Kit/unit tests: Explicitly cast fMode to int in taskCopyFile(). ​​bugref:10195

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 90.1 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 94526 2022-04-08 08:17:12Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2022 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 94526 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 if sys.version_info < (3, 9, 0):
75 # Removed since Python 3.9.
76 sStr = abStr.tostring(); # pylint: disable=no-member
77 else:
78 sStr = abStr.tobytes();
79 return sStr.decode('utf_8');
80 except:
81 reporter.errorXcpt('getSZ(,%u)' % (off));
82 return sDefault;
83
84def getSZLen(abData, off):
85 """
86 Get the length of a zero-terminated string field, in bytes.
87 Returns -1 if off is beyond the data packet or not properly terminated.
88 """
89 cbData = len(abData);
90 if off >= cbData:
91 return -1;
92
93 offCur = off;
94 while abData[offCur] != 0:
95 offCur = offCur + 1;
96 if offCur >= cbData:
97 return -1;
98
99 return offCur - off;
100
101def isValidOpcodeEncoding(sOpcode):
102 """
103 Checks if the specified opcode is valid or not.
104 Returns True on success.
105 Returns False if it is invalid, details in the log.
106 """
107 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
108 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
109 if len(sOpcode) != 8:
110 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
111 return False;
112 for i in range(0, 1):
113 if sSet1.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 for i in range(2, 7):
117 if sSet2.find(sOpcode[i]) < 0:
118 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
119 return False;
120 return True;
121
122#
123# Helper for encoding data sent to the TXS.
124#
125
126def u32ToByteArray(u32):
127 """Encodes the u32 value as a little endian byte (B) array."""
128 return array.array('B',
129 ( u32 % 256,
130 (u32 // 256) % 256,
131 (u32 // 65536) % 256,
132 (u32 // 16777216) % 256) );
133
134def escapeString(sString):
135 """
136 Does $ escaping of the string so TXS doesn't try do variable expansion.
137 """
138 return sString.replace('$', '$$');
139
140
141
142class TransportBase(object):
143 """
144 Base class for the transport layer.
145 """
146
147 def __init__(self, sCaller):
148 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
149 self.fDummy = 0;
150 self.abReadAheadHdr = array.array('B');
151
152 def toString(self):
153 """
154 Stringify the instance for logging and debugging.
155 """
156 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
157
158 def __str__(self):
159 return self.toString();
160
161 def cancelConnect(self):
162 """
163 Cancels any pending connect() call.
164 Returns None;
165 """
166 return None;
167
168 def connect(self, cMsTimeout):
169 """
170 Quietly attempts to connect to the TXS.
171
172 Returns True on success.
173 Returns False on retryable errors (no logging).
174 Returns None on fatal errors with details in the log.
175
176 Override this method, don't call super.
177 """
178 _ = cMsTimeout;
179 return False;
180
181 def disconnect(self, fQuiet = False):
182 """
183 Disconnect from the TXS.
184
185 Returns True.
186
187 Override this method, don't call super.
188 """
189 _ = fQuiet;
190 return True;
191
192 def sendBytes(self, abBuf, cMsTimeout):
193 """
194 Sends the bytes in the buffer abBuf to the TXS.
195
196 Returns True on success.
197 Returns False on failure and error details in the log.
198
199 Override this method, don't call super.
200
201 Remarks: len(abBuf) is always a multiple of 16.
202 """
203 _ = abBuf; _ = cMsTimeout;
204 return False;
205
206 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
207 """
208 Receive cb number of bytes from the TXS.
209
210 Returns the bytes (array('B')) on success.
211 Returns None on failure and error details in the log.
212
213 Override this method, don't call super.
214
215 Remarks: cb is always a multiple of 16.
216 """
217 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
218 return None;
219
220 def isConnectionOk(self):
221 """
222 Checks if the connection is OK.
223
224 Returns True if it is.
225 Returns False if it isn't (caller should call diconnect).
226
227 Override this method, don't call super.
228 """
229 return True;
230
231 def isRecvPending(self, cMsTimeout = 0):
232 """
233 Checks if there is incoming bytes, optionally waiting cMsTimeout
234 milliseconds for something to arrive.
235
236 Returns True if there is, False if there isn't.
237
238 Override this method, don't call super.
239 """
240 _ = cMsTimeout;
241 return False;
242
243 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
244 """
245 Sends a message (opcode + encoded payload).
246
247 Returns True on success.
248 Returns False on failure and error details in the log.
249 """
250 # Fix + check the opcode.
251 if len(sOpcode) < 2:
252 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
253 return False;
254 sOpcode = sOpcode.ljust(8);
255 if not isValidOpcodeEncoding(sOpcode):
256 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
257 return False;
258
259 # Start construct the message.
260 cbMsg = 16 + len(abPayload);
261 abMsg = array.array('B');
262 abMsg.extend(u32ToByteArray(cbMsg));
263 abMsg.extend((0, 0, 0, 0)); # uCrc32
264 try:
265 abMsg.extend(array.array('B', \
266 ( ord(sOpcode[0]), \
267 ord(sOpcode[1]), \
268 ord(sOpcode[2]), \
269 ord(sOpcode[3]), \
270 ord(sOpcode[4]), \
271 ord(sOpcode[5]), \
272 ord(sOpcode[6]), \
273 ord(sOpcode[7]) ) ) );
274 if abPayload:
275 abMsg.extend(abPayload);
276 except:
277 reporter.fatalXcpt('sendMsgInt: packing problem...');
278 return False;
279
280 # checksum it, padd it and send it off.
281 uCrc32 = zlib.crc32(abMsg[8:]);
282 abMsg[4:8] = u32ToByteArray(uCrc32);
283
284 while len(abMsg) % 16:
285 abMsg.append(0);
286
287 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
288 return self.sendBytes(abMsg, cMsTimeout);
289
290 def recvMsg(self, cMsTimeout, fNoDataOk = False):
291 """
292 Receives a message from the TXS.
293
294 Returns the message three-tuple: length, opcode, payload.
295 Returns (None, None, None) on failure and error details in the log.
296 """
297
298 # Read the header.
299 if self.abReadAheadHdr:
300 assert(len(self.abReadAheadHdr) == 16);
301 abHdr = self.abReadAheadHdr;
302 self.abReadAheadHdr = array.array('B');
303 else:
304 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
305 if abHdr is None:
306 return (None, None, None);
307 if len(abHdr) != 16:
308 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
309 return (None, None, None);
310
311 # Unpack and validate the header.
312 cbMsg = getU32(abHdr, 0);
313 uCrc32 = getU32(abHdr, 4);
314
315 if sys.version_info < (3, 9, 0):
316 # Removed since Python 3.9.
317 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
318 else:
319 sOpcode = abHdr[8:16].tobytes();
320 sOpcode = sOpcode.decode('ascii');
321
322 if cbMsg < 16:
323 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
324 return (None, None, None);
325 if cbMsg > 1024*1024:
326 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
327 return (None, None, None);
328 if not isValidOpcodeEncoding(sOpcode):
329 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
330 return (None, None, None);
331
332 # Get the payload (if any), dropping the padding.
333 abPayload = array.array('B');
334 if cbMsg > 16:
335 if cbMsg % 16:
336 cbPadding = 16 - (cbMsg % 16);
337 else:
338 cbPadding = 0;
339 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
340 if abPayload is None:
341 self.abReadAheadHdr = abHdr;
342 if not fNoDataOk :
343 reporter.log('recvMsg: failed to recv payload bytes!');
344 return (None, None, None);
345
346 while cbPadding > 0:
347 abPayload.pop();
348 cbPadding = cbPadding - 1;
349
350 # Check the CRC-32.
351 if uCrc32 != 0:
352 uActualCrc32 = zlib.crc32(abHdr[8:]);
353 if cbMsg > 16:
354 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
355 uActualCrc32 = uActualCrc32 & 0xffffffff;
356 if uCrc32 != uActualCrc32:
357 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
358 return (None, None, None);
359
360 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
361 return (cbMsg, sOpcode, abPayload);
362
363 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
364 """
365 Sends a message (opcode + payload tuple).
366
367 Returns True on success.
368 Returns False on failure and error details in the log.
369 Returns None if you pass the incorrectly typed parameters.
370 """
371 # Encode the payload.
372 abPayload = array.array('B');
373 for o in aoPayload:
374 try:
375 if utils.isString(o):
376 if sys.version_info[0] >= 3:
377 abPayload.extend(o.encode('utf_8'));
378 else:
379 # the primitive approach...
380 sUtf8 = o.encode('utf_8');
381 for ch in sUtf8:
382 abPayload.append(ord(ch))
383 abPayload.append(0);
384 elif isinstance(o, (long, int)):
385 if o < 0 or o > 0xffffffff:
386 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
387 return None;
388 abPayload.extend(u32ToByteArray(o));
389 elif isinstance(o, array.array):
390 abPayload.extend(o);
391 else:
392 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
393 return None;
394 except:
395 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
396 return None;
397 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
398
399
400class Session(TdTaskBase):
401 """
402 A Test eXecution Service (TXS) client session.
403 """
404
405 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
406 """
407 Construct a TXS session.
408
409 This starts by connecting to the TXS and will enter the signalled state
410 when connected or the timeout has been reached.
411 """
412 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
413 self.oTransport = oTransport;
414 self.sStatus = "";
415 self.cMsTimeout = 0;
416 self.fErr = True; # Whether to report errors as error.
417 self.msStart = 0;
418 self.oThread = None;
419 self.fnTask = self.taskDummy;
420 self.aTaskArgs = None;
421 self.oTaskRc = None;
422 self.t3oReply = (None, None, None);
423 self.fScrewedUpMsgState = False;
424 self.fTryConnect = fTryConnect;
425
426 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
427 raise base.GenError("startTask failed");
428
429 def __del__(self):
430 """Make sure to cancel the task when deleted."""
431 self.cancelTask();
432
433 def toString(self):
434 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
435 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
436 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
437 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
438
439 def taskDummy(self):
440 """Place holder to catch broken state handling."""
441 raise Exception();
442
443 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
444 """
445 Kicks of a new task.
446
447 cMsTimeout: The task timeout in milliseconds. Values less than
448 500 ms will be adjusted to 500 ms. This means it is
449 OK to use negative value.
450 sStatus: The task status.
451 fnTask: The method that'll execute the task.
452 aArgs: Arguments to pass to fnTask.
453
454 Returns True on success, False + error in log on failure.
455 """
456 if not self.cancelTask():
457 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
458 return False;
459
460 # Change status and make sure we're the
461 self.lockTask();
462 if self.sStatus != "":
463 self.unlockTask();
464 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
465 return False;
466 self.sStatus = "setup";
467 self.oTaskRc = None;
468 self.t3oReply = (None, None, None);
469 self.resetTaskLocked();
470 self.unlockTask();
471
472 self.cMsTimeout = max(cMsTimeout, 500);
473 self.fErr = not fIgnoreErrors;
474 self.fnTask = fnTask;
475 self.aTaskArgs = aArgs;
476 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
477 self.oThread.setDaemon(True);
478 self.msStart = base.timestampMilli();
479
480 self.lockTask();
481 self.sStatus = sStatus;
482 self.unlockTask();
483 self.oThread.start();
484
485 return True;
486
487 def cancelTask(self, fSync = True):
488 """
489 Attempts to cancel any pending tasks.
490 Returns success indicator (True/False).
491 """
492 self.lockTask();
493
494 if self.sStatus == "":
495 self.unlockTask();
496 return True;
497 if self.sStatus == "setup":
498 self.unlockTask();
499 return False;
500 if self.sStatus == "cancelled":
501 self.unlockTask();
502 return False;
503
504 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
505 if self.sStatus == 'connecting':
506 self.oTransport.cancelConnect();
507
508 self.sStatus = "cancelled";
509 oThread = self.oThread;
510 self.unlockTask();
511
512 if not fSync:
513 return False;
514
515 oThread.join(61.0);
516
517 if sys.version_info < (3, 9, 0):
518 # Removed since Python 3.9.
519 return oThread.isAlive(); # pylint: disable=no-member
520 return oThread.is_alive();
521
522 def taskThread(self):
523 """
524 The task thread function.
525 This does some housekeeping activities around the real task method call.
526 """
527 if not self.isCancelled():
528 try:
529 fnTask = self.fnTask;
530 oTaskRc = fnTask(*self.aTaskArgs);
531 except:
532 reporter.fatalXcpt('taskThread', 15);
533 oTaskRc = None;
534 else:
535 reporter.log('taskThread: cancelled already');
536
537 self.lockTask();
538
539 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
540 self.oTaskRc = oTaskRc;
541 self.oThread = None;
542 self.sStatus = '';
543 self.signalTaskLocked();
544
545 self.unlockTask();
546 return None;
547
548 def isCancelled(self):
549 """Internal method for checking if the task has been cancelled."""
550 self.lockTask();
551 sStatus = self.sStatus;
552 self.unlockTask();
553 if sStatus == "cancelled":
554 return True;
555 return False;
556
557 def hasTimedOut(self):
558 """Internal method for checking if the task has timed out or not."""
559 cMsLeft = self.getMsLeft();
560 if cMsLeft <= 0:
561 return True;
562 return False;
563
564 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
565 """Gets the time left until the timeout."""
566 cMsElapsed = base.timestampMilli() - self.msStart;
567 if cMsElapsed < 0:
568 return cMsMin;
569 cMsLeft = self.cMsTimeout - cMsElapsed;
570 if cMsLeft <= cMsMin:
571 return cMsMin;
572 if cMsLeft > cMsMax > 0:
573 return cMsMax
574 return cMsLeft;
575
576 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
577 """
578 Wrapper for TransportBase.recvMsg that stashes the response away
579 so the client can inspect it later on.
580 """
581 if cMsTimeout is None:
582 cMsTimeout = self.getMsLeft(500);
583 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
584 self.lockTask();
585 self.t3oReply = (cbMsg, sOpcode, abPayload);
586 self.unlockTask();
587 return (cbMsg, sOpcode, abPayload);
588
589 def recvAck(self, fNoDataOk = False):
590 """
591 Receives an ACK or error response from the TXS.
592
593 Returns True on success.
594 Returns False on timeout or transport error.
595 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
596 and there are always details of some sort or another.
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
599 if cbMsg is None:
600 return False;
601 sOpcode = sOpcode.strip()
602 if sOpcode == "ACK":
603 return True;
604 return (sOpcode, getSZ(abPayload, 0, sOpcode));
605
606 def recvAckLogged(self, sCommand, fNoDataOk = False):
607 """
608 Wrapper for recvAck and logging.
609 Returns True on success (ACK).
610 Returns False on time, transport error and errors signalled by TXS.
611 """
612 rc = self.recvAck(fNoDataOk);
613 if rc is not True and not fNoDataOk:
614 if rc is False:
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
616 else:
617 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
618 rc = False;
619 return rc;
620
621 def recvTrueFalse(self, sCommand):
622 """
623 Receives a TRUE/FALSE response from the TXS.
624 Returns True on TRUE, False on FALSE and None on error/other (logged).
625 """
626 cbMsg, sOpcode, abPayload = self.recvReply();
627 if cbMsg is None:
628 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
629 return None;
630
631 sOpcode = sOpcode.strip()
632 if sOpcode == "TRUE":
633 return True;
634 if sOpcode == "FALSE":
635 return False;
636 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
637 return None;
638
639 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
640 """
641 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
642 """
643 if cMsTimeout is None:
644 cMsTimeout = self.getMsLeft(500);
645 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
646
647 def asyncToSync(self, fnAsync, *aArgs):
648 """
649 Wraps an asynchronous task into a synchronous operation.
650
651 Returns False on failure, task return status on success.
652 """
653 rc = fnAsync(*aArgs);
654 if rc is False:
655 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
656 return rc;
657
658 rc = self.waitForTask(self.cMsTimeout + 5000);
659 if rc is False:
660 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
661 self.cancelTask();
662 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
663 return False;
664
665 rc = self.getResult();
666 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
667 return rc;
668
669 #
670 # Connection tasks.
671 #
672
673 def taskConnect(self, cMsIdleFudge):
674 """Tries to connect to the TXS"""
675 while not self.isCancelled():
676 reporter.log2('taskConnect: connecting ...');
677 rc = self.oTransport.connect(self.getMsLeft(500));
678 if rc is True:
679 reporter.log('taskConnect: succeeded');
680 return self.taskGreet(cMsIdleFudge);
681 if rc is None:
682 reporter.log2('taskConnect: unable to connect');
683 return None;
684 if self.hasTimedOut():
685 reporter.log2('taskConnect: timed out');
686 if not self.fTryConnect:
687 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
688 return False;
689 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
690 if not self.fTryConnect:
691 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
692 return False;
693
694 def taskGreet(self, cMsIdleFudge):
695 """Greets the TXS"""
696 rc = self.sendMsg("HOWDY", ());
697 if rc is True:
698 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
699 if rc is True:
700 while cMsIdleFudge > 0:
701 cMsIdleFudge -= 1000;
702 time.sleep(1);
703 else:
704 self.oTransport.disconnect(self.fTryConnect);
705 return rc;
706
707 def taskBye(self):
708 """Says goodbye to the TXS"""
709 rc = self.sendMsg("BYE");
710 if rc is True:
711 rc = self.recvAckLogged("BYE");
712 self.oTransport.disconnect();
713 return rc;
714
715 def taskVer(self):
716 """Requests version information from TXS"""
717 rc = self.sendMsg("VER");
718 if rc is True:
719 rc = False;
720 cbMsg, sOpcode, abPayload = self.recvReply();
721 if cbMsg is not None:
722 sOpcode = sOpcode.strip();
723 if sOpcode == "ACK VER":
724 sVer = getSZ(abPayload, 0);
725 if sVer is not None:
726 rc = sVer;
727 else:
728 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
729 else:
730 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
731 return rc;
732
733 def taskUuid(self):
734 """Gets the TXS UUID"""
735 rc = self.sendMsg("UUID");
736 if rc is True:
737 rc = False;
738 cbMsg, sOpcode, abPayload = self.recvReply();
739 if cbMsg is not None:
740 sOpcode = sOpcode.strip()
741 if sOpcode == "ACK UUID":
742 sUuid = getSZ(abPayload, 0);
743 if sUuid is not None:
744 sUuid = '{%s}' % (sUuid,)
745 try:
746 _ = uuid.UUID(sUuid);
747 rc = sUuid;
748 except:
749 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
750 else:
751 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
752 else:
753 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
754 else:
755 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
756 return rc;
757
758 #
759 # Process task
760 # pylint: disable=missing-docstring
761 #
762
763 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
764 # Construct the payload.
765 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
766 for sArg in asArgs:
767 aoPayload.append('%s' % (sArg));
768 aoPayload.append(long(len(asAddEnv)));
769 for sPutEnv in asAddEnv:
770 aoPayload.append('%s' % (sPutEnv));
771 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
772 if utils.isString(o):
773 aoPayload.append(o);
774 elif o is not None:
775 aoPayload.append('|');
776 o.uTxsClientCrc32 = zlib.crc32(b'');
777 else:
778 aoPayload.append('');
779 aoPayload.append('%s' % (sAsUser));
780 aoPayload.append(long(self.cMsTimeout));
781
782 # Kick of the EXEC command.
783 rc = self.sendMsg('EXEC', aoPayload)
784 if rc is True:
785 rc = self.recvAckLogged('EXEC');
786 if rc is True:
787 # Loop till the process completes, feed input to the TXS and
788 # receive output from it.
789 sFailure = "";
790 msPendingInputReply = None;
791 cbMsg, sOpcode, abPayload = (None, None, None);
792 while True:
793 # Pending input?
794 if msPendingInputReply is None \
795 and oStdIn is not None \
796 and not utils.isString(oStdIn):
797 try:
798 sInput = oStdIn.read(65536);
799 except:
800 reporter.errorXcpt('read standard in');
801 sFailure = 'exception reading stdin';
802 rc = None;
803 break;
804 if sInput:
805 # Convert to a byte array before handing it of to sendMsg or the string
806 # will get some zero termination added breaking the CRC (and injecting
807 # unwanted bytes).
808 abInput = array.array('B', sInput.encode('utf-8'));
809 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
810 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
811 if rc is not True:
812 sFailure = 'sendMsg failure';
813 break;
814 msPendingInputReply = base.timestampMilli();
815 continue;
816
817 rc = self.sendMsg('STDINEOS');
818 oStdIn = None;
819 if rc is not True:
820 sFailure = 'sendMsg failure';
821 break;
822 msPendingInputReply = base.timestampMilli();
823
824 # Wait for input (500 ms timeout).
825 if cbMsg is None:
826 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
827 if cbMsg is None:
828 # Check for time out before restarting the loop.
829 # Note! Only doing timeout checking here does mean that
830 # the TXS may prevent us from timing out by
831 # flooding us with data. This is unlikely though.
832 if self.hasTimedOut() \
833 and ( msPendingInputReply is None \
834 or base.timestampMilli() - msPendingInputReply > 30000):
835 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
836 sFailure = 'timeout';
837 rc = None;
838 break;
839 # Check that the connection is OK.
840 if not self.oTransport.isConnectionOk():
841 self.oTransport.disconnect();
842 sFailure = 'disconnected';
843 rc = False;
844 break;
845 continue;
846
847 # Handle the response.
848 sOpcode = sOpcode.rstrip();
849 if sOpcode == 'STDOUT':
850 oOut = oStdOut;
851 elif sOpcode == 'STDERR':
852 oOut = oStdErr;
853 elif sOpcode == 'TESTPIPE':
854 oOut = oTestPipe;
855 else:
856 oOut = None;
857 if oOut is not None:
858 # Output from the process.
859 if len(abPayload) < 4:
860 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
861 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
862 rc = None;
863 break;
864 uStreamCrc32 = getU32(abPayload, 0);
865 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
866 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
867 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
868 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 try:
873 oOut.write(abPayload[4:]);
874 except:
875 sFailure = 'exception writing %s' % (sOpcode);
876 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
877 rc = None;
878 break;
879 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
880 # Standard input is ignored. Ignore this condition for now.
881 msPendingInputReply = None;
882 reporter.log('taskExecEx: Standard input is ignored... why?');
883 del oStdIn.uTxsClientCrc32;
884 oStdIn = '/dev/null';
885 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
886 and msPendingInputReply is not None:
887 # TXS STDIN error, abort.
888 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
889 msPendingInputReply = None;
890 sFailure = 'TXS is out of memory for std input buffering';
891 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
892 rc = None;
893 break;
894 elif sOpcode == 'ACK' and msPendingInputReply is not None:
895 msPendingInputReply = None;
896 elif sOpcode.startswith('PROC '):
897 # Process status message, handle it outside the loop.
898 rc = True;
899 break;
900 else:
901 sFailure = 'Unexpected opcode %s' % (sOpcode);
902 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
903 rc = None;
904 break;
905 # Clear the message.
906 cbMsg, sOpcode, abPayload = (None, None, None);
907
908 # If we sent an STDIN packet and didn't get a reply yet, we'll give
909 # TXS some 5 seconds to reply to this. If we don't wait here we'll
910 # get screwed later on if we mix it up with the reply to some other
911 # command. Hackish.
912 if msPendingInputReply is not None:
913 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
914 if cbMsg2 is not None:
915 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
916 % (cbMsg2, sOpcode2, abPayload2));
917 msPendingInputReply = None;
918 else:
919 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
920 self.fScrewedUpMsgState = True;
921
922 # Parse the exit status (True), abort (None) or do nothing (False).
923 if rc is True:
924 if sOpcode == 'PROC OK':
925 pass;
926 else:
927 rc = False;
928 # Do proper parsing some other day if needed:
929 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
930 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
931 if sOpcode == 'PROC DOO':
932 reporter.log('taskExecEx: PROC DOO[FUS]: %s' % (abPayload,));
933 elif sOpcode.startswith('PROC NOK'):
934 reporter.log('taskExecEx: PROC NOK: rcExit=%s' % (abPayload,));
935 elif abPayload and sOpcode.startswith('PROC '):
936 reporter.log('taskExecEx: %s payload=%s' % (sOpcode, abPayload,));
937
938 else:
939 if rc is None:
940 # Abort it.
941 reporter.log('taskExecEx: sending ABORT...');
942 rc = self.sendMsg('ABORT');
943 while rc is True:
944 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
945 if cbMsg is None:
946 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
947 self.fScrewedUpMsgState = True;
948 break;
949 if sOpcode.startswith('PROC '):
950 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
951 break;
952 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
953 # Check that the connection is OK before looping.
954 if not self.oTransport.isConnectionOk():
955 self.oTransport.disconnect();
956 break;
957
958 # Fake response with the reason why we quit.
959 if sFailure is not None:
960 self.t3oReply = (0, 'EXECFAIL', sFailure);
961 rc = None;
962 else:
963 rc = None;
964
965 # Cleanup.
966 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
967 if o is not None and not utils.isString(o):
968 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
969 # Make sure all files are closed
970 o.close(); # pylint: disable=maybe-no-member
971 reporter.log('taskExecEx: returns %s' % (rc));
972 return rc;
973
974 #
975 # Admin tasks
976 #
977
978 def hlpRebootShutdownWaitForAck(self, sCmd):
979 """Wait for reboot/shutodwn ACK."""
980 rc = self.recvAckLogged(sCmd);
981 if rc is True:
982 # poll a little while for server to disconnect.
983 uMsStart = base.timestampMilli();
984 while self.oTransport.isConnectionOk() \
985 and base.timestampMilli() - uMsStart >= 5000:
986 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
987 break;
988 self.oTransport.disconnect();
989 return rc;
990
991 def taskReboot(self):
992 rc = self.sendMsg('REBOOT');
993 if rc is True:
994 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
995 return rc;
996
997 def taskShutdown(self):
998 rc = self.sendMsg('SHUTDOWN');
999 if rc is True:
1000 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
1001 return rc;
1002
1003 #
1004 # CD/DVD control tasks.
1005 #
1006
1007 ## TODO
1008
1009 #
1010 # File system tasks
1011 #
1012
1013 def taskMkDir(self, sRemoteDir, fMode):
1014 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1015 if rc is True:
1016 rc = self.recvAckLogged('MKDIR');
1017 return rc;
1018
1019 def taskMkDirPath(self, sRemoteDir, fMode):
1020 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1021 if rc is True:
1022 rc = self.recvAckLogged('MKDRPATH');
1023 return rc;
1024
1025 def taskMkSymlink(self, sLinkTarget, sLink):
1026 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1027 if rc is True:
1028 rc = self.recvAckLogged('MKSYMLNK');
1029 return rc;
1030
1031 def taskRmDir(self, sRemoteDir):
1032 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1033 if rc is True:
1034 rc = self.recvAckLogged('RMDIR');
1035 return rc;
1036
1037 def taskRmFile(self, sRemoteFile):
1038 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1039 if rc is True:
1040 rc = self.recvAckLogged('RMFILE');
1041 return rc;
1042
1043 def taskRmSymlink(self, sRemoteSymlink):
1044 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1045 if rc is True:
1046 rc = self.recvAckLogged('RMSYMLNK');
1047 return rc;
1048
1049 def taskRmTree(self, sRemoteTree):
1050 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1051 if rc is True:
1052 rc = self.recvAckLogged('RMTREE');
1053 return rc;
1054
1055 def taskChMod(self, sRemotePath, fMode):
1056 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1057 if rc is True:
1058 rc = self.recvAckLogged('CHMOD');
1059 return rc;
1060
1061 def taskChOwn(self, sRemotePath, idUser, idGroup):
1062 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1063 if rc is True:
1064 rc = self.recvAckLogged('CHOWN');
1065 return rc;
1066
1067 def taskIsDir(self, sRemoteDir):
1068 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1069 if rc is True:
1070 rc = self.recvTrueFalse('ISDIR');
1071 return rc;
1072
1073 def taskIsFile(self, sRemoteFile):
1074 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1075 if rc is True:
1076 rc = self.recvTrueFalse('ISFILE');
1077 return rc;
1078
1079 def taskIsSymlink(self, sRemoteSymlink):
1080 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1081 if rc is True:
1082 rc = self.recvTrueFalse('ISSYMLNK');
1083 return rc;
1084
1085 #def "STAT "
1086 #def "LSTAT "
1087 #def "LIST "
1088
1089 def taskCopyFile(self, sSrcFile, sDstFile, fMode, fFallbackOkay):
1090 """ Copies a file within the remote from source to destination. """
1091 _ = fFallbackOkay; # Not used yet.
1092 # Note: If fMode is set to 0, it's up to the target OS' implementation with
1093 # what a file mode the destination file gets created (i.e. via umask).
1094 rc = self.sendMsg('CPFILE', (int(fMode), sSrcFile, sDstFile,));
1095 if rc is True:
1096 rc = self.recvAckLogged('CPFILE');
1097 return rc;
1098
1099 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1100 #
1101 # Open the local file (make sure it exist before bothering TXS) and
1102 # tell TXS that we want to upload a file.
1103 #
1104 try:
1105 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1106 except:
1107 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1108 return False;
1109
1110 # Common cause with taskUploadStr
1111 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1112
1113 # Cleanup.
1114 oLocalFile.close();
1115 return rc;
1116
1117 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1118 # Wrap sContent in a file like class.
1119 class InStringFile(object): # pylint: disable=too-few-public-methods
1120 def __init__(self, sContent):
1121 self.sContent = sContent;
1122 self.off = 0;
1123
1124 def read(self, cbMax):
1125 cbLeft = len(self.sContent) - self.off;
1126 if cbLeft == 0:
1127 return "";
1128 if cbLeft <= cbMax:
1129 sRet = self.sContent[self.off:(self.off + cbLeft)];
1130 else:
1131 sRet = self.sContent[self.off:(self.off + cbMax)];
1132 self.off = self.off + len(sRet);
1133 return sRet;
1134
1135 oLocalString = InStringFile(sContent);
1136 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1137
1138 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1139 """Common worker used by taskUploadFile and taskUploadString."""
1140 #
1141 # Command + ACK.
1142 #
1143 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1144 # Fall back on the old command if the new one is not known by the TXS.
1145 #
1146 if fMode == 0:
1147 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1148 if rc is True:
1149 rc = self.recvAckLogged('PUT FILE');
1150 else:
1151 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1152 if rc is True:
1153 rc = self.recvAck();
1154 if rc is False:
1155 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1156 elif rc is not True:
1157 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1158 # Fallback:
1159 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1160 if rc is True:
1161 rc = self.recvAckLogged('PUT FILE');
1162 else:
1163 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1164 rc = False;
1165 if rc is True:
1166 #
1167 # Push data packets until eof.
1168 #
1169 uMyCrc32 = zlib.crc32(b'');
1170 while True:
1171 # Read up to 64 KB of data.
1172 try:
1173 sRaw = oLocalFile.read(65536);
1174 except:
1175 rc = None;
1176 break;
1177
1178 # Convert to array - this is silly!
1179 abBuf = array.array('B');
1180 if utils.isString(sRaw):
1181 for i, _ in enumerate(sRaw):
1182 abBuf.append(ord(sRaw[i]));
1183 else:
1184 abBuf.extend(sRaw);
1185 sRaw = None;
1186
1187 # Update the file stream CRC and send it off.
1188 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1189 if not abBuf:
1190 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1191 else:
1192 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1193 if rc is False:
1194 break;
1195
1196 # Wait for the reply.
1197 rc = self.recvAck();
1198 if rc is not True:
1199 if rc is False:
1200 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1201 else:
1202 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1203 rc = False;
1204 break;
1205
1206 # EOF?
1207 if not abBuf:
1208 break;
1209
1210 # Send ABORT on ACK and I/O errors.
1211 if rc is None:
1212 rc = self.sendMsg('ABORT');
1213 if rc is True:
1214 self.recvAckLogged('ABORT');
1215 rc = False;
1216 return rc;
1217
1218 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1219 try:
1220 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1221 except:
1222 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1223 return False;
1224
1225 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1226
1227 oLocalFile.close();
1228 if rc is False:
1229 try:
1230 os.remove(sLocalFile);
1231 except:
1232 reporter.errorXcpt();
1233 return rc;
1234
1235 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1236 # Wrap sContent in a file like class.
1237 class OutStringFile(object): # pylint: disable=too-few-public-methods
1238 def __init__(self):
1239 self.asContent = [];
1240
1241 def write(self, sBuf):
1242 self.asContent.append(sBuf);
1243 return None;
1244
1245 oLocalString = OutStringFile();
1246 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1247 if rc is True:
1248 rc = '';
1249 for sBuf in oLocalString.asContent:
1250 if hasattr(sBuf, 'decode'):
1251 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1252 else:
1253 rc += sBuf;
1254 return rc;
1255
1256 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1257 """Common worker for taskDownloadFile and taskDownloadString."""
1258 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1259 if rc is True:
1260 #
1261 # Process data packets until eof.
1262 #
1263 uMyCrc32 = zlib.crc32(b'');
1264 while rc is True:
1265 cbMsg, sOpcode, abPayload = self.recvReply();
1266 if cbMsg is None:
1267 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1268 rc = None;
1269 break;
1270
1271 # Validate.
1272 sOpcode = sOpcode.rstrip();
1273 if sOpcode not in ('DATA', 'DATA EOF',):
1274 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1275 % (sOpcode, getSZ(abPayload, 0, "None")));
1276 rc = False;
1277 break;
1278 if sOpcode == 'DATA' and len(abPayload) < 4:
1279 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1280 rc = None;
1281 break;
1282 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1283 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1284 rc = None;
1285 break;
1286
1287 # Check the CRC (common for both packets).
1288 uCrc32 = getU32(abPayload, 0);
1289 if sOpcode == 'DATA':
1290 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1291 if uCrc32 != (uMyCrc32 & 0xffffffff):
1292 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1293 % (hex(uMyCrc32), hex(uCrc32)));
1294 rc = None;
1295 break;
1296 if sOpcode == 'DATA EOF':
1297 rc = self.sendMsg('ACK');
1298 break;
1299
1300 # Finally, push the data to the file.
1301 try:
1302 if sys.version_info < (3, 9, 0):
1303 # Removed since Python 3.9.
1304 abData = abPayload[4:].tostring();
1305 else:
1306 abData = abPayload[4:].tobytes();
1307 oLocalFile.write(abData);
1308 except:
1309 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1310 rc = None;
1311 break;
1312 rc = self.sendMsg('ACK');
1313
1314 # Send NACK on validation and I/O errors.
1315 if rc is None:
1316 rc = self.sendMsg('NACK');
1317 rc = False;
1318 return rc;
1319
1320 def taskPackFile(self, sRemoteFile, sRemoteSource):
1321 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1322 if rc is True:
1323 rc = self.recvAckLogged('PKFILE');
1324 return rc;
1325
1326 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1327 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1328 if rc is True:
1329 rc = self.recvAckLogged('UNPKFILE');
1330 return rc;
1331
1332 def taskExpandString(self, sString):
1333 rc = self.sendMsg('EXP STR ', (sString,));
1334 if rc is True:
1335 rc = False;
1336 cbMsg, sOpcode, abPayload = self.recvReply();
1337 if cbMsg is not None:
1338 sOpcode = sOpcode.strip();
1339 if sOpcode == "STRING":
1340 sStringExp = getSZ(abPayload, 0);
1341 if sStringExp is not None:
1342 rc = sStringExp;
1343 else: # Also handles SHORTSTR reply (not enough space to store result).
1344 reporter.maybeErr(self.fErr, 'taskExpandString got a bad reply: %s' % (sOpcode,));
1345 else:
1346 reporter.maybeErr(self.fErr, 'taskExpandString got 3xNone from recvReply.');
1347 return rc;
1348
1349 # pylint: enable=missing-docstring
1350
1351
1352 #
1353 # Public methods - generic task queries
1354 #
1355
1356 def isSuccess(self):
1357 """Returns True if the task completed successfully, otherwise False."""
1358 self.lockTask();
1359 sStatus = self.sStatus;
1360 oTaskRc = self.oTaskRc;
1361 self.unlockTask();
1362 if sStatus != "":
1363 return False;
1364 if oTaskRc is False or oTaskRc is None:
1365 return False;
1366 return True;
1367
1368 def getResult(self):
1369 """
1370 Returns the result of a completed task.
1371 Returns None if not completed yet or no previous task.
1372 """
1373 self.lockTask();
1374 sStatus = self.sStatus;
1375 oTaskRc = self.oTaskRc;
1376 self.unlockTask();
1377 if sStatus != "":
1378 return None;
1379 return oTaskRc;
1380
1381 def getLastReply(self):
1382 """
1383 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1384 Returns a None, None, None three-tuple if there was no last reply.
1385 """
1386 self.lockTask();
1387 t3oReply = self.t3oReply;
1388 self.unlockTask();
1389 return t3oReply;
1390
1391 #
1392 # Public methods - connection.
1393 #
1394
1395 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1396 """
1397 Initiates a disconnect task.
1398
1399 Returns True on success, False on failure (logged).
1400
1401 The task returns True on success and False on failure.
1402 """
1403 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1404
1405 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1406 """Synchronous version."""
1407 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1408
1409 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1410 """
1411 Initiates a task for getting the TXS version information.
1412
1413 Returns True on success, False on failure (logged).
1414
1415 The task returns the version string on success and False on failure.
1416 """
1417 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1418
1419 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1420 """Synchronous version."""
1421 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1422
1423 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1424 """
1425 Initiates a task for getting the TXS UUID.
1426
1427 Returns True on success, False on failure (logged).
1428
1429 The task returns UUID string (in {}) on success and False on failure.
1430 """
1431 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1432
1433 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1434 """Synchronous version."""
1435 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1436
1437 #
1438 # Public methods - execution.
1439 #
1440
1441 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1442 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1443 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1444 """
1445 Initiates a exec process task.
1446
1447 Returns True on success, False on failure (logged).
1448
1449 The task returns True if the process exited normally with status code 0.
1450 The task returns None if on failure prior to executing the process, and
1451 False if the process exited with a different status or in an abnormal
1452 manner. Both None and False are logged of course and further info can
1453 also be obtained by getLastReply().
1454
1455 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1456 these streams. If None, no special action is taken and the output goes
1457 to where ever the TXS sends its output, and ditto for input.
1458 - To send to / read from the bitbucket, pass '/dev/null'.
1459 - To redirect to/from a file, just specify the remote filename.
1460 - To append to a file use '>>' followed by the remote filename.
1461 - To pipe the stream to/from the TXS, specify a file like
1462 object. For StdIn a non-blocking read() method is required. For
1463 the other a write() method is required. Watch out for deadlock
1464 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1465 """
1466 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1467 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1468 oStdOut, oStdErr, oTestPipe, sAsUser));
1469
1470 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1471 oStdIn = '/dev/null', oStdOut = '/dev/null',
1472 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1473 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1474 """Synchronous version."""
1475 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1476 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1477
1478 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1479 cMsTimeout = 3600000, fIgnoreErrors = False):
1480 """
1481 Initiates a exec process test task.
1482
1483 Returns True on success, False on failure (logged).
1484
1485 The task returns True if the process exited normally with status code 0.
1486 The task returns None if on failure prior to executing the process, and
1487 False if the process exited with a different status or in an abnormal
1488 manner. Both None and False are logged of course and further info can
1489 also be obtained by getLastReply().
1490
1491 Standard in is taken from /dev/null. While both standard output and
1492 standard error goes directly to reporter.log(). The testpipe is piped
1493 to reporter.xxxx.
1494 """
1495
1496 sStdIn = '/dev/null';
1497 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1498 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1499 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1500 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1501
1502 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1503 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1504
1505 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1506 cMsTimeout = 3600000, fIgnoreErrors = False):
1507 """Synchronous version."""
1508 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1509 cMsTimeout, fIgnoreErrors);
1510
1511 #
1512 # Public methods - system
1513 #
1514
1515 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1516 """
1517 Initiates a reboot task.
1518
1519 Returns True on success, False on failure (logged).
1520
1521 The task returns True on success, False on failure (logged). The
1522 session will be disconnected on successful task completion.
1523 """
1524 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1525
1526 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1527 """Synchronous version."""
1528 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1529
1530 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1531 """
1532 Initiates a shutdown task.
1533
1534 Returns True on success, False on failure (logged).
1535
1536 The task returns True on success, False on failure (logged).
1537 """
1538 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1539
1540 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1541 """Synchronous version."""
1542 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1543
1544
1545 #
1546 # Public methods - file system
1547 #
1548
1549 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1550 """
1551 Initiates a mkdir task.
1552
1553 Returns True on success, False on failure (logged).
1554
1555 The task returns True on success, False on failure (logged).
1556 """
1557 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1558
1559 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """Synchronous version."""
1561 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1562
1563 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """
1565 Initiates a mkdir -p task.
1566
1567 Returns True on success, False on failure (logged).
1568
1569 The task returns True on success, False on failure (logged).
1570 """
1571 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1572
1573 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """Synchronous version."""
1575 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1576
1577 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1578 """
1579 Initiates a symlink task.
1580
1581 Returns True on success, False on failure (logged).
1582
1583 The task returns True on success, False on failure (logged).
1584 """
1585 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1586
1587 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """Synchronous version."""
1589 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1590
1591 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1592 """
1593 Initiates a rmdir task.
1594
1595 Returns True on success, False on failure (logged).
1596
1597 The task returns True on success, False on failure (logged).
1598 """
1599 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1600
1601 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """Synchronous version."""
1603 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1604
1605 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1606 """
1607 Initiates a rmfile task.
1608
1609 Returns True on success, False on failure (logged).
1610
1611 The task returns True on success, False on failure (logged).
1612 """
1613 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1614
1615 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1616 """Synchronous version."""
1617 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1618
1619 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1620 """
1621 Initiates a rmsymlink task.
1622
1623 Returns True on success, False on failure (logged).
1624
1625 The task returns True on success, False on failure (logged).
1626 """
1627 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1628
1629 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1630 """Synchronous version."""
1631 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1632
1633 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1634 """
1635 Initiates a rmtree task.
1636
1637 Returns True on success, False on failure (logged).
1638
1639 The task returns True on success, False on failure (logged).
1640 """
1641 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1642
1643 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1644 """Synchronous version."""
1645 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1646
1647 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1648 """
1649 Initiates a chmod task.
1650
1651 Returns True on success, False on failure (logged).
1652
1653 The task returns True on success, False on failure (logged).
1654 """
1655 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1656
1657 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1658 """Synchronous version."""
1659 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1660
1661 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1662 """
1663 Initiates a chown task.
1664
1665 Returns True on success, False on failure (logged).
1666
1667 The task returns True on success, False on failure (logged).
1668 """
1669 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1670
1671 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1672 """Synchronous version."""
1673 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1674
1675 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1676 """
1677 Initiates a is-dir query task.
1678
1679 Returns True on success, False on failure (logged).
1680
1681 The task returns True if it's a directory, False if it isn't, and
1682 None on error (logged).
1683 """
1684 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1685
1686 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1687 """Synchronous version."""
1688 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1689
1690 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1691 """
1692 Initiates a is-file query task.
1693
1694 Returns True on success, False on failure (logged).
1695
1696 The task returns True if it's a file, False if it isn't, and None on
1697 error (logged).
1698 """
1699 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1700
1701 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1702 """Synchronous version."""
1703 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1704
1705 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1706 """
1707 Initiates a is-symbolic-link query task.
1708
1709 Returns True on success, False on failure (logged).
1710
1711 The task returns True if it's a symbolic linke, False if it isn't, and
1712 None on error (logged).
1713 """
1714 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1715
1716 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1717 """Synchronous version."""
1718 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1719
1720 #def "STAT "
1721 #def "LSTAT "
1722 #def "LIST "
1723
1724 @staticmethod
1725 def calcFileXferTimeout(cbFile):
1726 """
1727 Calculates a reasonable timeout for an upload/download given the file size.
1728
1729 Returns timeout in milliseconds.
1730 """
1731 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1732
1733 @staticmethod
1734 def calcUploadTimeout(sLocalFile):
1735 """
1736 Calculates a reasonable timeout for an upload given the file (will stat it).
1737
1738 Returns timeout in milliseconds.
1739 """
1740 try: cbFile = os.path.getsize(sLocalFile);
1741 except: cbFile = 1024*1024;
1742 return Session.calcFileXferTimeout(cbFile);
1743
1744 def asyncCopyFile(self, sSrcFile, sDstFile,
1745 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1746 """
1747 Initiates a file copying task on the remote.
1748
1749 Returns True on success, False on failure (logged).
1750
1751 The task returns True on success, False on failure (logged).
1752 """
1753 return self.startTask(cMsTimeout, fIgnoreErrors, "cpfile",
1754 self.taskCopyFile, (sSrcFile, sDstFile, fMode, fFallbackOkay));
1755
1756 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1757 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1758 """
1759 Initiates a download query task.
1760
1761 Returns True on success, False on failure (logged).
1762
1763 The task returns True on success, False on failure (logged).
1764 """
1765 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1766 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1767
1768 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1769 """Synchronous version."""
1770 if cMsTimeout <= 0:
1771 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1772 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1773
1774 def asyncUploadString(self, sContent, sRemoteFile,
1775 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1776 """
1777 Initiates a upload string task.
1778
1779 Returns True on success, False on failure (logged).
1780
1781 The task returns True on success, False on failure (logged).
1782 """
1783 if cMsTimeout <= 0:
1784 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1785 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1786 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1787
1788 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1789 """Synchronous version."""
1790 if cMsTimeout <= 0:
1791 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1792 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1793
1794 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1795 """
1796 Initiates a download file task.
1797
1798 Returns True on success, False on failure (logged).
1799
1800 The task returns True on success, False on failure (logged).
1801 """
1802 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1803
1804 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1805 """Synchronous version."""
1806 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1807
1808 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1809 cMsTimeout = 30000, fIgnoreErrors = False):
1810 """
1811 Initiates a download string task.
1812
1813 Returns True on success, False on failure (logged).
1814
1815 The task returns a byte string on success, False on failure (logged).
1816 """
1817 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1818 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1819
1820 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1821 cMsTimeout = 30000, fIgnoreErrors = False):
1822 """Synchronous version."""
1823 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1824 cMsTimeout, fIgnoreErrors);
1825
1826 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1827 """
1828 Initiates a packing file/directory task.
1829
1830 Returns True on success, False on failure (logged).
1831
1832 The task returns True on success, False on failure (logged).
1833 """
1834 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1835 (sRemoteFile, sRemoteSource));
1836
1837 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1838 """Synchronous version."""
1839 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1840
1841 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1842 """
1843 Initiates a unpack file task.
1844
1845 Returns True on success, False on failure (logged).
1846
1847 The task returns True on success, False on failure (logged).
1848 """
1849 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1850 (sRemoteFile, sRemoteDir));
1851
1852 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1853 """Synchronous version."""
1854 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1855
1856 def asyncExpandString(self, sString, cMsTimeout = 120000, fIgnoreErrors = False):
1857 """
1858 Initiates an expand string task.
1859
1860 Returns expanded string on success, False on failure (logged).
1861
1862 The task returns True on success, False on failure (logged).
1863 """
1864 return self.startTask(cMsTimeout, fIgnoreErrors, "expandString",
1865 self.taskExpandString, (sString,));
1866
1867 def syncExpandString(self, sString, cMsTimeout = 120000, fIgnoreErrors = False):
1868 """Synchronous version."""
1869 return self.asyncToSync(self.asyncExpandString, sString, cMsTimeout, fIgnoreErrors);
1870
1871
1872class TransportTcp(TransportBase):
1873 """
1874 TCP transport layer for the TXS client session class.
1875 """
1876
1877 def __init__(self, sHostname, uPort, fReversedSetup):
1878 """
1879 Save the parameters. The session will call us back to make the
1880 connection later on its worker thread.
1881 """
1882 TransportBase.__init__(self, utils.getCallerName());
1883 self.sHostname = sHostname;
1884 self.fReversedSetup = fReversedSetup;
1885 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1886 self.oSocket = None;
1887 self.oWakeupW = None;
1888 self.oWakeupR = None;
1889 self.fConnectCanceled = False;
1890 self.fIsConnecting = False;
1891 self.oCv = threading.Condition();
1892 self.abReadAhead = array.array('B');
1893
1894 def toString(self):
1895 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1896 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1897 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1898 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1899
1900 def __isInProgressXcpt(self, oXcpt):
1901 """ In progress exception? """
1902 try:
1903 if isinstance(oXcpt, socket.error):
1904 try:
1905 if oXcpt.errno == errno.EINPROGRESS:
1906 return True;
1907 except: pass;
1908 # Windows?
1909 try:
1910 if oXcpt.errno == errno.EWOULDBLOCK:
1911 return True;
1912 except: pass;
1913 except:
1914 pass;
1915 return False;
1916
1917 def __isWouldBlockXcpt(self, oXcpt):
1918 """ Would block exception? """
1919 try:
1920 if isinstance(oXcpt, socket.error):
1921 try:
1922 if oXcpt.errno == errno.EWOULDBLOCK:
1923 return True;
1924 except: pass;
1925 try:
1926 if oXcpt.errno == errno.EAGAIN:
1927 return True;
1928 except: pass;
1929 except:
1930 pass;
1931 return False;
1932
1933 def __isConnectionReset(self, oXcpt):
1934 """ Connection reset by Peer or others. """
1935 try:
1936 if isinstance(oXcpt, socket.error):
1937 try:
1938 if oXcpt.errno == errno.ECONNRESET:
1939 return True;
1940 except: pass;
1941 try:
1942 if oXcpt.errno == errno.ENETRESET:
1943 return True;
1944 except: pass;
1945 except:
1946 pass;
1947 return False;
1948
1949 def _closeWakeupSockets(self):
1950 """ Closes the wakup sockets. Caller should own the CV. """
1951 oWakeupR = self.oWakeupR;
1952 self.oWakeupR = None;
1953 if oWakeupR is not None:
1954 oWakeupR.close();
1955
1956 oWakeupW = self.oWakeupW;
1957 self.oWakeupW = None;
1958 if oWakeupW is not None:
1959 oWakeupW.close();
1960
1961 return None;
1962
1963 def cancelConnect(self):
1964 # This is bad stuff.
1965 self.oCv.acquire();
1966 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1967 self.fConnectCanceled = True;
1968 if self.fIsConnecting:
1969 oSocket = self.oSocket;
1970 self.oSocket = None;
1971 if oSocket is not None:
1972 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1973 oSocket.close();
1974
1975 oWakeupW = self.oWakeupW;
1976 self.oWakeupW = None;
1977 if oWakeupW is not None:
1978 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1979 try: oWakeupW.send(b'cancelled!\n');
1980 except: reporter.logXcpt();
1981 try: oWakeupW.shutdown(socket.SHUT_WR);
1982 except: reporter.logXcpt();
1983 oWakeupW.close();
1984 self.oCv.release();
1985
1986 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1987 """ Connects to the TXS server as server, i.e. the reversed setup. """
1988 assert(self.fReversedSetup);
1989
1990 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1991
1992 # Workaround for bind() failure...
1993 try:
1994 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1995 except:
1996 reporter.errorXcpt('socket.listen(1) failed');
1997 return None;
1998
1999 # Bind the socket and make it listen.
2000 try:
2001 oSocket.bind((self.sHostname, self.uPort));
2002 except:
2003 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
2004 return None;
2005 try:
2006 oSocket.listen(1);
2007 except:
2008 reporter.errorXcpt('socket.listen(1) failed');
2009 return None;
2010
2011 # Accept connections.
2012 oClientSocket = None;
2013 tClientAddr = None;
2014 try:
2015 (oClientSocket, tClientAddr) = oSocket.accept();
2016 except socket.error as e:
2017 if not self.__isInProgressXcpt(e):
2018 raise;
2019
2020 # Do the actual waiting.
2021 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
2022 try:
2023 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2024 except socket.error as oXctp:
2025 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
2026 raise;
2027 reporter.log('socket.select() on accept was canceled');
2028 return None;
2029 except:
2030 reporter.logXcpt('socket.select() on accept');
2031
2032 # Try accept again.
2033 try:
2034 (oClientSocket, tClientAddr) = oSocket.accept();
2035 except socket.error as oXcpt:
2036 if not self.__isInProgressXcpt(e):
2037 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
2038 raise;
2039 reporter.log('socket.accept() was canceled');
2040 return None;
2041 reporter.log('socket.accept() timed out');
2042 return False;
2043 except:
2044 reporter.errorXcpt('socket.accept() failed');
2045 return None;
2046 except:
2047 reporter.errorXcpt('socket.accept() failed');
2048 return None;
2049
2050 # Store the connected socket and throw away the server socket.
2051 self.oCv.acquire();
2052 if not self.fConnectCanceled:
2053 self.oSocket.close();
2054 self.oSocket = oClientSocket;
2055 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
2056 self.oCv.release();
2057 return True;
2058
2059 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
2060 """ Connects to the TXS server as client. """
2061 assert(not self.fReversedSetup);
2062
2063 # Connect w/ timeouts.
2064 rc = None;
2065 try:
2066 oSocket.connect((self.sHostname, self.uPort));
2067 rc = True;
2068 except socket.error as oXcpt:
2069 iRc = oXcpt.errno;
2070 if self.__isInProgressXcpt(oXcpt):
2071 # Do the actual waiting.
2072 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2073 try:
2074 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2075 if len(ttRc[1]) + len(ttRc[2]) == 0:
2076 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2077 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2078 rc = iRc == 0;
2079 except socket.error as oXcpt2:
2080 iRc = oXcpt2.errno;
2081 except:
2082 iRc = -42;
2083 reporter.fatalXcpt('socket.select() on connect failed');
2084
2085 if rc is True:
2086 pass;
2087 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2088 rc = False; # try again.
2089 else:
2090 if iRc != errno.EBADF or not self.fConnectCanceled:
2091 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2092 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2093 except:
2094 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2095 return rc;
2096
2097
2098 def connect(self, cMsTimeout):
2099 # Create a non-blocking socket.
2100 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2101 try:
2102 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2103 except:
2104 reporter.fatalXcpt('socket.socket() failed');
2105 return None;
2106 try:
2107 oSocket.setblocking(0);
2108 except:
2109 oSocket.close();
2110 reporter.fatalXcpt('socket.socket() failed');
2111 return None;
2112
2113 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2114 oWakeupR = None;
2115 oWakeupW = None;
2116 if hasattr(socket, 'socketpair'):
2117 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2118 except: reporter.logXcpt('socket.socketpair() failed');
2119
2120 # Update the state.
2121 self.oCv.acquire();
2122 rc = None;
2123 if not self.fConnectCanceled:
2124 self.oSocket = oSocket;
2125 self.oWakeupW = oWakeupW;
2126 self.oWakeupR = oWakeupR;
2127 self.fIsConnecting = True;
2128 self.oCv.release();
2129
2130 # Try connect.
2131 if oWakeupR is None:
2132 oWakeupR = oSocket; # Avoid select failure.
2133 if self.fReversedSetup:
2134 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2135 else:
2136 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2137 oSocket = None;
2138
2139 # Update the state and cleanup on failure/cancel.
2140 self.oCv.acquire();
2141 if rc is True and self.fConnectCanceled:
2142 rc = False;
2143 self.fIsConnecting = False;
2144
2145 if rc is not True:
2146 if self.oSocket is not None:
2147 self.oSocket.close();
2148 self.oSocket = None;
2149 self._closeWakeupSockets();
2150 self.oCv.release();
2151
2152 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2153 return rc;
2154
2155 def disconnect(self, fQuiet = False):
2156 if self.oSocket is not None:
2157 self.abReadAhead = array.array('B');
2158
2159 # Try a shutting down the socket gracefully (draining it).
2160 try:
2161 self.oSocket.shutdown(socket.SHUT_WR);
2162 except:
2163 if not fQuiet:
2164 reporter.error('shutdown(SHUT_WR)');
2165 try:
2166 self.oSocket.setblocking(0); # just in case it's not set.
2167 sData = "1";
2168 while sData:
2169 sData = self.oSocket.recv(16384);
2170 except:
2171 pass;
2172
2173 # Close it.
2174 self.oCv.acquire();
2175 try: self.oSocket.setblocking(1);
2176 except: pass;
2177 self.oSocket.close();
2178 self.oSocket = None;
2179 else:
2180 self.oCv.acquire();
2181 self._closeWakeupSockets();
2182 self.oCv.release();
2183
2184 def sendBytes(self, abBuf, cMsTimeout):
2185 if self.oSocket is None:
2186 reporter.error('TransportTcp.sendBytes: No connection.');
2187 return False;
2188
2189 # Try send it all.
2190 try:
2191 cbSent = self.oSocket.send(abBuf);
2192 if cbSent == len(abBuf):
2193 return True;
2194 except Exception as oXcpt:
2195 if not self.__isWouldBlockXcpt(oXcpt):
2196 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2197 return False;
2198 cbSent = 0;
2199
2200 # Do a timed send.
2201 msStart = base.timestampMilli();
2202 while True:
2203 cMsElapsed = base.timestampMilli() - msStart;
2204 if cMsElapsed > cMsTimeout:
2205 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2206 break;
2207
2208 # wait.
2209 try:
2210 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2211 if ttRc[2] and not ttRc[1]:
2212 reporter.error('TranportTcp.sendBytes: select returned with exception');
2213 break;
2214 if not ttRc[1]:
2215 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2216 break;
2217 except:
2218 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2219 break;
2220
2221 # Try send more.
2222 try:
2223 cbSent += self.oSocket.send(abBuf[cbSent:]);
2224 if cbSent == len(abBuf):
2225 return True;
2226 except Exception as oXcpt:
2227 if not self.__isWouldBlockXcpt(oXcpt):
2228 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2229 break;
2230
2231 return False;
2232
2233 def __returnReadAheadBytes(self, cb):
2234 """ Internal worker for recvBytes. """
2235 assert(len(self.abReadAhead) >= cb);
2236 abRet = self.abReadAhead[:cb];
2237 self.abReadAhead = self.abReadAhead[cb:];
2238 return abRet;
2239
2240 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2241 if self.oSocket is None:
2242 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2243 return None;
2244
2245 # Try read in some more data without bothering with timeout handling first.
2246 if len(self.abReadAhead) < cb:
2247 try:
2248 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2249 if abBuf:
2250 self.abReadAhead.extend(array.array('B', abBuf));
2251 except Exception as oXcpt:
2252 if not self.__isWouldBlockXcpt(oXcpt):
2253 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2254 return None;
2255
2256 if len(self.abReadAhead) >= cb:
2257 return self.__returnReadAheadBytes(cb);
2258
2259 # Timeout loop.
2260 msStart = base.timestampMilli();
2261 while True:
2262 cMsElapsed = base.timestampMilli() - msStart;
2263 if cMsElapsed > cMsTimeout:
2264 if not fNoDataOk or self.abReadAhead:
2265 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2266 break;
2267
2268 # Wait.
2269 try:
2270 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2271 if ttRc[2] and not ttRc[0]:
2272 reporter.error('TranportTcp.recvBytes: select returned with exception');
2273 break;
2274 if not ttRc[0]:
2275 if not fNoDataOk or self.abReadAhead:
2276 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2277 % (len(self.abReadAhead), cb, fNoDataOk));
2278 break;
2279 except:
2280 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2281 break;
2282
2283 # Try read more.
2284 try:
2285 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2286 if not abBuf:
2287 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2288 % (len(self.abReadAhead), cb, fNoDataOk));
2289 self.disconnect();
2290 return None;
2291
2292 self.abReadAhead.extend(array.array('B', abBuf));
2293
2294 except Exception as oXcpt:
2295 reporter.log('recv => exception %s' % (oXcpt,));
2296 if not self.__isWouldBlockXcpt(oXcpt):
2297 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2298 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2299 break;
2300
2301 # Done?
2302 if len(self.abReadAhead) >= cb:
2303 return self.__returnReadAheadBytes(cb);
2304
2305 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2306 return None;
2307
2308 def isConnectionOk(self):
2309 if self.oSocket is None:
2310 return False;
2311 try:
2312 ttRc = select.select([], [], [self.oSocket], 0.0);
2313 if ttRc[2]:
2314 return False;
2315
2316 self.oSocket.send(array.array('B')); # send zero bytes.
2317 except:
2318 return False;
2319 return True;
2320
2321 def isRecvPending(self, cMsTimeout = 0):
2322 try:
2323 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2324 if not ttRc[0]:
2325 return False;
2326 except:
2327 pass;
2328 return True;
2329
2330
2331def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2332 """
2333 Opens a connection to a Test Execution Service via TCP, given its name.
2334
2335 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2336 or similar.
2337 """
2338 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2339 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2340 try:
2341 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2342 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2343 except:
2344 reporter.errorXcpt(None, 15);
2345 return None;
2346 return oSession;
2347
2348
2349def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2350 """
2351 Tries to open a connection to a Test Execution Service via TCP, given its name.
2352
2353 This differs from openTcpSession in that it won't log a connection failure
2354 as an error.
2355 """
2356 try:
2357 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2358 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2359 except:
2360 reporter.errorXcpt(None, 15);
2361 return None;
2362 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette