VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 84764

Last change on this file since 84764 was 84764, checked in by vboxsync, 5 years ago

Validation Kit/txsclient: Added support for creating (packing) files / directories on the guest.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 85.3 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 84764 2020-06-10 15:37:14Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2020 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 84764 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B',
124 ( u32 % 256,
125 (u32 // 256) % 256,
126 (u32 // 65536) % 256,
127 (u32 // 16777216) % 256) );
128
129def escapeString(sString):
130 """
131 Does $ escaping of the string so TXS doesn't try do variable expansion.
132 """
133 return sString.replace('$', '$$');
134
135
136
137class TransportBase(object):
138 """
139 Base class for the transport layer.
140 """
141
142 def __init__(self, sCaller):
143 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
144 self.fDummy = 0;
145 self.abReadAheadHdr = array.array('B');
146
147 def toString(self):
148 """
149 Stringify the instance for logging and debugging.
150 """
151 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
152
153 def __str__(self):
154 return self.toString();
155
156 def cancelConnect(self):
157 """
158 Cancels any pending connect() call.
159 Returns None;
160 """
161 return None;
162
163 def connect(self, cMsTimeout):
164 """
165 Quietly attempts to connect to the TXS.
166
167 Returns True on success.
168 Returns False on retryable errors (no logging).
169 Returns None on fatal errors with details in the log.
170
171 Override this method, don't call super.
172 """
173 _ = cMsTimeout;
174 return False;
175
176 def disconnect(self, fQuiet = False):
177 """
178 Disconnect from the TXS.
179
180 Returns True.
181
182 Override this method, don't call super.
183 """
184 _ = fQuiet;
185 return True;
186
187 def sendBytes(self, abBuf, cMsTimeout):
188 """
189 Sends the bytes in the buffer abBuf to the TXS.
190
191 Returns True on success.
192 Returns False on failure and error details in the log.
193
194 Override this method, don't call super.
195
196 Remarks: len(abBuf) is always a multiple of 16.
197 """
198 _ = abBuf; _ = cMsTimeout;
199 return False;
200
201 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
202 """
203 Receive cb number of bytes from the TXS.
204
205 Returns the bytes (array('B')) on success.
206 Returns None on failure and error details in the log.
207
208 Override this method, don't call super.
209
210 Remarks: cb is always a multiple of 16.
211 """
212 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
213 return None;
214
215 def isConnectionOk(self):
216 """
217 Checks if the connection is OK.
218
219 Returns True if it is.
220 Returns False if it isn't (caller should call diconnect).
221
222 Override this method, don't call super.
223 """
224 return True;
225
226 def isRecvPending(self, cMsTimeout = 0):
227 """
228 Checks if there is incoming bytes, optionally waiting cMsTimeout
229 milliseconds for something to arrive.
230
231 Returns True if there is, False if there isn't.
232
233 Override this method, don't call super.
234 """
235 _ = cMsTimeout;
236 return False;
237
238 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
239 """
240 Sends a message (opcode + encoded payload).
241
242 Returns True on success.
243 Returns False on failure and error details in the log.
244 """
245 # Fix + check the opcode.
246 if len(sOpcode) < 2:
247 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
248 return False;
249 sOpcode = sOpcode.ljust(8);
250 if not isValidOpcodeEncoding(sOpcode):
251 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
252 return False;
253
254 # Start construct the message.
255 cbMsg = 16 + len(abPayload);
256 abMsg = array.array('B');
257 abMsg.extend(u32ToByteArray(cbMsg));
258 abMsg.extend((0, 0, 0, 0)); # uCrc32
259 try:
260 abMsg.extend(array.array('B', \
261 ( ord(sOpcode[0]), \
262 ord(sOpcode[1]), \
263 ord(sOpcode[2]), \
264 ord(sOpcode[3]), \
265 ord(sOpcode[4]), \
266 ord(sOpcode[5]), \
267 ord(sOpcode[6]), \
268 ord(sOpcode[7]) ) ) );
269 if abPayload:
270 abMsg.extend(abPayload);
271 except:
272 reporter.fatalXcpt('sendMsgInt: packing problem...');
273 return False;
274
275 # checksum it, padd it and send it off.
276 uCrc32 = zlib.crc32(abMsg[8:]);
277 abMsg[4:8] = u32ToByteArray(uCrc32);
278
279 while len(abMsg) % 16:
280 abMsg.append(0);
281
282 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
283 return self.sendBytes(abMsg, cMsTimeout);
284
285 def recvMsg(self, cMsTimeout, fNoDataOk = False):
286 """
287 Receives a message from the TXS.
288
289 Returns the message three-tuple: length, opcode, payload.
290 Returns (None, None, None) on failure and error details in the log.
291 """
292
293 # Read the header.
294 if self.abReadAheadHdr:
295 assert(len(self.abReadAheadHdr) == 16);
296 abHdr = self.abReadAheadHdr;
297 self.abReadAheadHdr = array.array('B');
298 else:
299 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
300 if abHdr is None:
301 return (None, None, None);
302 if len(abHdr) != 16:
303 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
304 return (None, None, None);
305
306 # Unpack and validate the header.
307 cbMsg = getU32(abHdr, 0);
308 uCrc32 = getU32(abHdr, 4);
309 sOpcode = abHdr[8:16].tostring().decode('ascii');
310
311 if cbMsg < 16:
312 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
313 return (None, None, None);
314 if cbMsg > 1024*1024:
315 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
316 return (None, None, None);
317 if not isValidOpcodeEncoding(sOpcode):
318 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
319 return (None, None, None);
320
321 # Get the payload (if any), dropping the padding.
322 abPayload = array.array('B');
323 if cbMsg > 16:
324 if cbMsg % 16:
325 cbPadding = 16 - (cbMsg % 16);
326 else:
327 cbPadding = 0;
328 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
329 if abPayload is None:
330 self.abReadAheadHdr = abHdr;
331 if not fNoDataOk :
332 reporter.log('recvMsg: failed to recv payload bytes!');
333 return (None, None, None);
334
335 while cbPadding > 0:
336 abPayload.pop();
337 cbPadding = cbPadding - 1;
338
339 # Check the CRC-32.
340 if uCrc32 != 0:
341 uActualCrc32 = zlib.crc32(abHdr[8:]);
342 if cbMsg > 16:
343 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
344 uActualCrc32 = uActualCrc32 & 0xffffffff;
345 if uCrc32 != uActualCrc32:
346 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
347 return (None, None, None);
348
349 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
350 return (cbMsg, sOpcode, abPayload);
351
352 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
353 """
354 Sends a message (opcode + payload tuple).
355
356 Returns True on success.
357 Returns False on failure and error details in the log.
358 Returns None if you pass the incorrectly typed parameters.
359 """
360 # Encode the payload.
361 abPayload = array.array('B');
362 for o in aoPayload:
363 try:
364 if utils.isString(o):
365 if sys.version_info[0] >= 3:
366 abPayload.extend(o.encode('utf_8'));
367 else:
368 # the primitive approach...
369 sUtf8 = o.encode('utf_8');
370 for ch in sUtf8:
371 abPayload.append(ord(ch))
372 abPayload.append(0);
373 elif isinstance(o, (long, int)):
374 if o < 0 or o > 0xffffffff:
375 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
376 return None;
377 abPayload.extend(u32ToByteArray(o));
378 elif isinstance(o, array.array):
379 abPayload.extend(o);
380 else:
381 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
382 return None;
383 except:
384 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
385 return None;
386 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
387
388
389class Session(TdTaskBase):
390 """
391 A Test eXecution Service (TXS) client session.
392 """
393
394 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
395 """
396 Construct a TXS session.
397
398 This starts by connecting to the TXS and will enter the signalled state
399 when connected or the timeout has been reached.
400 """
401 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
402 self.oTransport = oTransport;
403 self.sStatus = "";
404 self.cMsTimeout = 0;
405 self.fErr = True; # Whether to report errors as error.
406 self.msStart = 0;
407 self.oThread = None;
408 self.fnTask = self.taskDummy;
409 self.aTaskArgs = None;
410 self.oTaskRc = None;
411 self.t3oReply = (None, None, None);
412 self.fScrewedUpMsgState = False;
413 self.fTryConnect = fTryConnect;
414
415 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
416 raise base.GenError("startTask failed");
417
418 def __del__(self):
419 """Make sure to cancel the task when deleted."""
420 self.cancelTask();
421
422 def toString(self):
423 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
424 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
425 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
426 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
427
428 def taskDummy(self):
429 """Place holder to catch broken state handling."""
430 raise Exception();
431
432 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
433 """
434 Kicks of a new task.
435
436 cMsTimeout: The task timeout in milliseconds. Values less than
437 500 ms will be adjusted to 500 ms. This means it is
438 OK to use negative value.
439 sStatus: The task status.
440 fnTask: The method that'll execute the task.
441 aArgs: Arguments to pass to fnTask.
442
443 Returns True on success, False + error in log on failure.
444 """
445 if not self.cancelTask():
446 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
447 return False;
448
449 # Change status and make sure we're the
450 self.lockTask();
451 if self.sStatus != "":
452 self.unlockTask();
453 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
454 return False;
455 self.sStatus = "setup";
456 self.oTaskRc = None;
457 self.t3oReply = (None, None, None);
458 self.resetTaskLocked();
459 self.unlockTask();
460
461 self.cMsTimeout = max(cMsTimeout, 500);
462 self.fErr = not fIgnoreErrors;
463 self.fnTask = fnTask;
464 self.aTaskArgs = aArgs;
465 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
466 self.oThread.setDaemon(True);
467 self.msStart = base.timestampMilli();
468
469 self.lockTask();
470 self.sStatus = sStatus;
471 self.unlockTask();
472 self.oThread.start();
473
474 return True;
475
476 def cancelTask(self, fSync = True):
477 """
478 Attempts to cancel any pending tasks.
479 Returns success indicator (True/False).
480 """
481 self.lockTask();
482
483 if self.sStatus == "":
484 self.unlockTask();
485 return True;
486 if self.sStatus == "setup":
487 self.unlockTask();
488 return False;
489 if self.sStatus == "cancelled":
490 self.unlockTask();
491 return False;
492
493 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
494 if self.sStatus == 'connecting':
495 self.oTransport.cancelConnect();
496
497 self.sStatus = "cancelled";
498 oThread = self.oThread;
499 self.unlockTask();
500
501 if not fSync:
502 return False;
503
504 oThread.join(61.0);
505 return oThread.isAlive();
506
507 def taskThread(self):
508 """
509 The task thread function.
510 This does some housekeeping activities around the real task method call.
511 """
512 if not self.isCancelled():
513 try:
514 fnTask = self.fnTask;
515 oTaskRc = fnTask(*self.aTaskArgs);
516 except:
517 reporter.fatalXcpt('taskThread', 15);
518 oTaskRc = None;
519 else:
520 reporter.log('taskThread: cancelled already');
521
522 self.lockTask();
523
524 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
525 self.oTaskRc = oTaskRc;
526 self.oThread = None;
527 self.sStatus = '';
528 self.signalTaskLocked();
529
530 self.unlockTask();
531 return None;
532
533 def isCancelled(self):
534 """Internal method for checking if the task has been cancelled."""
535 self.lockTask();
536 sStatus = self.sStatus;
537 self.unlockTask();
538 if sStatus == "cancelled":
539 return True;
540 return False;
541
542 def hasTimedOut(self):
543 """Internal method for checking if the task has timed out or not."""
544 cMsLeft = self.getMsLeft();
545 if cMsLeft <= 0:
546 return True;
547 return False;
548
549 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
550 """Gets the time left until the timeout."""
551 cMsElapsed = base.timestampMilli() - self.msStart;
552 if cMsElapsed < 0:
553 return cMsMin;
554 cMsLeft = self.cMsTimeout - cMsElapsed;
555 if cMsLeft <= cMsMin:
556 return cMsMin;
557 if cMsLeft > cMsMax > 0:
558 return cMsMax
559 return cMsLeft;
560
561 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
562 """
563 Wrapper for TransportBase.recvMsg that stashes the response away
564 so the client can inspect it later on.
565 """
566 if cMsTimeout is None:
567 cMsTimeout = self.getMsLeft(500);
568 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
569 self.lockTask();
570 self.t3oReply = (cbMsg, sOpcode, abPayload);
571 self.unlockTask();
572 return (cbMsg, sOpcode, abPayload);
573
574 def recvAck(self, fNoDataOk = False):
575 """
576 Receives an ACK or error response from the TXS.
577
578 Returns True on success.
579 Returns False on timeout or transport error.
580 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
581 and there are always details of some sort or another.
582 """
583 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
584 if cbMsg is None:
585 return False;
586 sOpcode = sOpcode.strip()
587 if sOpcode == "ACK":
588 return True;
589 return (sOpcode, getSZ(abPayload, 0, sOpcode));
590
591 def recvAckLogged(self, sCommand, fNoDataOk = False):
592 """
593 Wrapper for recvAck and logging.
594 Returns True on success (ACK).
595 Returns False on time, transport error and errors signalled by TXS.
596 """
597 rc = self.recvAck(fNoDataOk);
598 if rc is not True and not fNoDataOk:
599 if rc is False:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 else:
602 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
603 rc = False;
604 return rc;
605
606 def recvTrueFalse(self, sCommand):
607 """
608 Receives a TRUE/FALSE response from the TXS.
609 Returns True on TRUE, False on FALSE and None on error/other (logged).
610 """
611 cbMsg, sOpcode, abPayload = self.recvReply();
612 if cbMsg is None:
613 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
614 return None;
615
616 sOpcode = sOpcode.strip()
617 if sOpcode == "TRUE":
618 return True;
619 if sOpcode == "FALSE":
620 return False;
621 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
622 return None;
623
624 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
625 """
626 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
627 """
628 if cMsTimeout is None:
629 cMsTimeout = self.getMsLeft(500);
630 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
631
632 def asyncToSync(self, fnAsync, *aArgs):
633 """
634 Wraps an asynchronous task into a synchronous operation.
635
636 Returns False on failure, task return status on success.
637 """
638 rc = fnAsync(*aArgs);
639 if rc is False:
640 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
641 return rc;
642
643 rc = self.waitForTask(self.cMsTimeout + 5000);
644 if rc is False:
645 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
646 self.cancelTask();
647 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
648 return False;
649
650 rc = self.getResult();
651 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
652 return rc;
653
654 #
655 # Connection tasks.
656 #
657
658 def taskConnect(self, cMsIdleFudge):
659 """Tries to connect to the TXS"""
660 while not self.isCancelled():
661 reporter.log2('taskConnect: connecting ...');
662 rc = self.oTransport.connect(self.getMsLeft(500));
663 if rc is True:
664 reporter.log('taskConnect: succeeded');
665 return self.taskGreet(cMsIdleFudge);
666 if rc is None:
667 reporter.log2('taskConnect: unable to connect');
668 return None;
669 if self.hasTimedOut():
670 reporter.log2('taskConnect: timed out');
671 if not self.fTryConnect:
672 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
673 return False;
674 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
675 if not self.fTryConnect:
676 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
677 return False;
678
679 def taskGreet(self, cMsIdleFudge):
680 """Greets the TXS"""
681 rc = self.sendMsg("HOWDY", ());
682 if rc is True:
683 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
684 if rc is True:
685 while cMsIdleFudge > 0:
686 cMsIdleFudge -= 1000;
687 time.sleep(1);
688 else:
689 self.oTransport.disconnect(self.fTryConnect);
690 return rc;
691
692 def taskBye(self):
693 """Says goodbye to the TXS"""
694 rc = self.sendMsg("BYE");
695 if rc is True:
696 rc = self.recvAckLogged("BYE");
697 self.oTransport.disconnect();
698 return rc;
699
700 def taskUuid(self):
701 """Gets the TXS UUID"""
702 rc = self.sendMsg("UUID");
703 if rc is True:
704 rc = False;
705 cbMsg, sOpcode, abPayload = self.recvReply();
706 if cbMsg is not None:
707 sOpcode = sOpcode.strip()
708 if sOpcode == "ACK UUID":
709 sUuid = getSZ(abPayload, 0);
710 if sUuid is not None:
711 sUuid = '{%s}' % (sUuid,)
712 try:
713 _ = uuid.UUID(sUuid);
714 rc = sUuid;
715 except:
716 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
717 else:
718 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
719 else:
720 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
721 else:
722 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
723 return rc;
724
725 #
726 # Process task
727 # pylint: disable=missing-docstring
728 #
729
730 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
731 # Construct the payload.
732 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
733 for sArg in asArgs:
734 aoPayload.append('%s' % (sArg));
735 aoPayload.append(long(len(asAddEnv)));
736 for sPutEnv in asAddEnv:
737 aoPayload.append('%s' % (sPutEnv));
738 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
739 if utils.isString(o):
740 aoPayload.append(o);
741 elif o is not None:
742 aoPayload.append('|');
743 o.uTxsClientCrc32 = zlib.crc32(b'');
744 else:
745 aoPayload.append('');
746 aoPayload.append('%s' % (sAsUser));
747 aoPayload.append(long(self.cMsTimeout));
748
749 # Kick of the EXEC command.
750 rc = self.sendMsg('EXEC', aoPayload)
751 if rc is True:
752 rc = self.recvAckLogged('EXEC');
753 if rc is True:
754 # Loop till the process completes, feed input to the TXS and
755 # receive output from it.
756 sFailure = "";
757 msPendingInputReply = None;
758 cbMsg, sOpcode, abPayload = (None, None, None);
759 while True:
760 # Pending input?
761 if msPendingInputReply is None \
762 and oStdIn is not None \
763 and not utils.isString(oStdIn):
764 try:
765 sInput = oStdIn.read(65536);
766 except:
767 reporter.errorXcpt('read standard in');
768 sFailure = 'exception reading stdin';
769 rc = None;
770 break;
771 if sInput:
772 # Convert to a byte array before handing it of to sendMsg or the string
773 # will get some zero termination added breaking the CRC (and injecting
774 # unwanted bytes).
775 abInput = array.array('B', sInput.encode('utf-8'));
776 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
777 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
778 if rc is not True:
779 sFailure = 'sendMsg failure';
780 break;
781 msPendingInputReply = base.timestampMilli();
782 continue;
783
784 rc = self.sendMsg('STDINEOS');
785 oStdIn = None;
786 if rc is not True:
787 sFailure = 'sendMsg failure';
788 break;
789 msPendingInputReply = base.timestampMilli();
790
791 # Wait for input (500 ms timeout).
792 if cbMsg is None:
793 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
794 if cbMsg is None:
795 # Check for time out before restarting the loop.
796 # Note! Only doing timeout checking here does mean that
797 # the TXS may prevent us from timing out by
798 # flooding us with data. This is unlikely though.
799 if self.hasTimedOut() \
800 and ( msPendingInputReply is None \
801 or base.timestampMilli() - msPendingInputReply > 30000):
802 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
803 sFailure = 'timeout';
804 rc = None;
805 break;
806 # Check that the connection is OK.
807 if not self.oTransport.isConnectionOk():
808 self.oTransport.disconnect();
809 sFailure = 'disconnected';
810 rc = False;
811 break;
812 continue;
813
814 # Handle the response.
815 sOpcode = sOpcode.rstrip();
816 if sOpcode == 'STDOUT':
817 oOut = oStdOut;
818 elif sOpcode == 'STDERR':
819 oOut = oStdErr;
820 elif sOpcode == 'TESTPIPE':
821 oOut = oTestPipe;
822 else:
823 oOut = None;
824 if oOut is not None:
825 # Output from the process.
826 if len(abPayload) < 4:
827 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
828 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
829 rc = None;
830 break;
831 uStreamCrc32 = getU32(abPayload, 0);
832 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
833 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
834 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
835 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
836 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
837 rc = None;
838 break;
839 try:
840 oOut.write(abPayload[4:]);
841 except:
842 sFailure = 'exception writing %s' % (sOpcode);
843 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
844 rc = None;
845 break;
846 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
847 # Standard input is ignored. Ignore this condition for now.
848 msPendingInputReply = None;
849 reporter.log('taskExecEx: Standard input is ignored... why?');
850 del oStdIn.uTxsClientCrc32;
851 oStdIn = '/dev/null';
852 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
853 and msPendingInputReply is not None:
854 # TXS STDIN error, abort.
855 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
856 msPendingInputReply = None;
857 sFailure = 'TXS is out of memory for std input buffering';
858 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
859 rc = None;
860 break;
861 elif sOpcode == 'ACK' and msPendingInputReply is not None:
862 msPendingInputReply = None;
863 elif sOpcode.startswith('PROC '):
864 # Process status message, handle it outside the loop.
865 rc = True;
866 break;
867 else:
868 sFailure = 'Unexpected opcode %s' % (sOpcode);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 # Clear the message.
873 cbMsg, sOpcode, abPayload = (None, None, None);
874
875 # If we sent an STDIN packet and didn't get a reply yet, we'll give
876 # TXS some 5 seconds to reply to this. If we don't wait here we'll
877 # get screwed later on if we mix it up with the reply to some other
878 # command. Hackish.
879 if msPendingInputReply is not None:
880 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
881 if cbMsg2 is not None:
882 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
883 % (cbMsg2, sOpcode2, abPayload2));
884 msPendingInputReply = None;
885 else:
886 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
887 self.fScrewedUpMsgState = True;
888
889 # Parse the exit status (True), abort (None) or do nothing (False).
890 if rc is True:
891 if sOpcode != 'PROC OK':
892 # Do proper parsing some other day if needed:
893 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
894 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
895 rc = False;
896 else:
897 if rc is None:
898 # Abort it.
899 reporter.log('taskExecEx: sending ABORT...');
900 rc = self.sendMsg('ABORT');
901 while rc is True:
902 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
903 if cbMsg is None:
904 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
905 self.fScrewedUpMsgState = True;
906 break;
907 if sOpcode.startswith('PROC '):
908 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
909 break;
910 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
911 # Check that the connection is OK before looping.
912 if not self.oTransport.isConnectionOk():
913 self.oTransport.disconnect();
914 break;
915
916 # Fake response with the reason why we quit.
917 if sFailure is not None:
918 self.t3oReply = (0, 'EXECFAIL', sFailure);
919 rc = None;
920 else:
921 rc = None;
922
923 # Cleanup.
924 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
925 if o is not None and not utils.isString(o):
926 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
927 # Make sure all files are closed
928 o.close(); # pylint: disable=maybe-no-member
929 reporter.log('taskExecEx: returns %s' % (rc));
930 return rc;
931
932 #
933 # Admin tasks
934 #
935
936 def hlpRebootShutdownWaitForAck(self, sCmd):
937 """Wait for reboot/shutodwn ACK."""
938 rc = self.recvAckLogged(sCmd);
939 if rc is True:
940 # poll a little while for server to disconnect.
941 uMsStart = base.timestampMilli();
942 while self.oTransport.isConnectionOk() \
943 and base.timestampMilli() - uMsStart >= 5000:
944 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
945 break;
946 self.oTransport.disconnect();
947 return rc;
948
949 def taskReboot(self):
950 rc = self.sendMsg('REBOOT');
951 if rc is True:
952 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
953 return rc;
954
955 def taskShutdown(self):
956 rc = self.sendMsg('SHUTDOWN');
957 if rc is True:
958 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
959 return rc;
960
961 #
962 # CD/DVD control tasks.
963 #
964
965 ## TODO
966
967 #
968 # File system tasks
969 #
970
971 def taskMkDir(self, sRemoteDir, fMode):
972 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
973 if rc is True:
974 rc = self.recvAckLogged('MKDIR');
975 return rc;
976
977 def taskMkDirPath(self, sRemoteDir, fMode):
978 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
979 if rc is True:
980 rc = self.recvAckLogged('MKDRPATH');
981 return rc;
982
983 def taskMkSymlink(self, sLinkTarget, sLink):
984 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
985 if rc is True:
986 rc = self.recvAckLogged('MKSYMLNK');
987 return rc;
988
989 def taskRmDir(self, sRemoteDir):
990 rc = self.sendMsg('RMDIR', (sRemoteDir,));
991 if rc is True:
992 rc = self.recvAckLogged('RMDIR');
993 return rc;
994
995 def taskRmFile(self, sRemoteFile):
996 rc = self.sendMsg('RMFILE', (sRemoteFile,));
997 if rc is True:
998 rc = self.recvAckLogged('RMFILE');
999 return rc;
1000
1001 def taskRmSymlink(self, sRemoteSymlink):
1002 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1003 if rc is True:
1004 rc = self.recvAckLogged('RMSYMLNK');
1005 return rc;
1006
1007 def taskRmTree(self, sRemoteTree):
1008 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1009 if rc is True:
1010 rc = self.recvAckLogged('RMTREE');
1011 return rc;
1012
1013 def taskChMod(self, sRemotePath, fMode):
1014 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1015 if rc is True:
1016 rc = self.recvAckLogged('CHMOD');
1017 return rc;
1018
1019 def taskChOwn(self, sRemotePath, idUser, idGroup):
1020 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1021 if rc is True:
1022 rc = self.recvAckLogged('CHOWN');
1023 return rc;
1024
1025 def taskIsDir(self, sRemoteDir):
1026 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1027 if rc is True:
1028 rc = self.recvTrueFalse('ISDIR');
1029 return rc;
1030
1031 def taskIsFile(self, sRemoteFile):
1032 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1033 if rc is True:
1034 rc = self.recvTrueFalse('ISFILE');
1035 return rc;
1036
1037 def taskIsSymlink(self, sRemoteSymlink):
1038 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1039 if rc is True:
1040 rc = self.recvTrueFalse('ISSYMLNK');
1041 return rc;
1042
1043 #def "STAT "
1044 #def "LSTAT "
1045 #def "LIST "
1046
1047 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1048 #
1049 # Open the local file (make sure it exist before bothering TXS) and
1050 # tell TXS that we want to upload a file.
1051 #
1052 try:
1053 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1054 except:
1055 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1056 return False;
1057
1058 # Common cause with taskUploadStr
1059 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1060
1061 # Cleanup.
1062 oLocalFile.close();
1063 return rc;
1064
1065 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1066 # Wrap sContent in a file like class.
1067 class InStringFile(object): # pylint: disable=too-few-public-methods
1068 def __init__(self, sContent):
1069 self.sContent = sContent;
1070 self.off = 0;
1071
1072 def read(self, cbMax):
1073 cbLeft = len(self.sContent) - self.off;
1074 if cbLeft == 0:
1075 return "";
1076 if cbLeft <= cbMax:
1077 sRet = self.sContent[self.off:(self.off + cbLeft)];
1078 else:
1079 sRet = self.sContent[self.off:(self.off + cbMax)];
1080 self.off = self.off + len(sRet);
1081 return sRet;
1082
1083 oLocalString = InStringFile(sContent);
1084 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1085
1086 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1087 """Common worker used by taskUploadFile and taskUploadString."""
1088 #
1089 # Command + ACK.
1090 #
1091 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1092 # Fall back on the old command if the new one is not known by the TXS.
1093 #
1094 if fMode == 0:
1095 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1096 if rc is True:
1097 rc = self.recvAckLogged('PUT FILE');
1098 else:
1099 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1100 if rc is True:
1101 rc = self.recvAck();
1102 if rc is False:
1103 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1104 elif rc is not True:
1105 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1106 # Fallback:
1107 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1108 if rc is True:
1109 rc = self.recvAckLogged('PUT FILE');
1110 else:
1111 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1112 rc = False;
1113 if rc is True:
1114 #
1115 # Push data packets until eof.
1116 #
1117 uMyCrc32 = zlib.crc32(b'');
1118 while True:
1119 # Read up to 64 KB of data.
1120 try:
1121 sRaw = oLocalFile.read(65536);
1122 except:
1123 rc = None;
1124 break;
1125
1126 # Convert to array - this is silly!
1127 abBuf = array.array('B');
1128 if utils.isString(sRaw):
1129 for i, _ in enumerate(sRaw):
1130 abBuf.append(ord(sRaw[i]));
1131 else:
1132 abBuf.extend(sRaw);
1133 sRaw = None;
1134
1135 # Update the file stream CRC and send it off.
1136 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1137 if not abBuf:
1138 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1139 else:
1140 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1141 if rc is False:
1142 break;
1143
1144 # Wait for the reply.
1145 rc = self.recvAck();
1146 if rc is not True:
1147 if rc is False:
1148 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1149 else:
1150 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1151 rc = False;
1152 break;
1153
1154 # EOF?
1155 if not abBuf:
1156 break;
1157
1158 # Send ABORT on ACK and I/O errors.
1159 if rc is None:
1160 rc = self.sendMsg('ABORT');
1161 if rc is True:
1162 self.recvAckLogged('ABORT');
1163 rc = False;
1164 return rc;
1165
1166 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1167 try:
1168 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1169 except:
1170 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1171 return False;
1172
1173 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1174
1175 oLocalFile.close();
1176 if rc is False:
1177 try:
1178 os.remove(sLocalFile);
1179 except:
1180 reporter.errorXcpt();
1181 return rc;
1182
1183 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1184 # Wrap sContent in a file like class.
1185 class OutStringFile(object): # pylint: disable=too-few-public-methods
1186 def __init__(self):
1187 self.asContent = [];
1188
1189 def write(self, sBuf):
1190 self.asContent.append(sBuf);
1191 return None;
1192
1193 oLocalString = OutStringFile();
1194 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1195 if rc is True:
1196 rc = '';
1197 for sBuf in oLocalString.asContent:
1198 if hasattr(sBuf, 'decode'):
1199 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1200 else:
1201 rc += sBuf;
1202 return rc;
1203
1204 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1205 """Common worker for taskDownloadFile and taskDownloadString."""
1206 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1207 if rc is True:
1208 #
1209 # Process data packets until eof.
1210 #
1211 uMyCrc32 = zlib.crc32(b'');
1212 while rc is True:
1213 cbMsg, sOpcode, abPayload = self.recvReply();
1214 if cbMsg is None:
1215 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1216 rc = None;
1217 break;
1218
1219 # Validate.
1220 sOpcode = sOpcode.rstrip();
1221 if sOpcode not in ('DATA', 'DATA EOF',):
1222 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1223 % (sOpcode, getSZ(abPayload, 0, "None")));
1224 rc = False;
1225 break;
1226 if sOpcode == 'DATA' and len(abPayload) < 4:
1227 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1228 rc = None;
1229 break;
1230 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1231 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1232 rc = None;
1233 break;
1234
1235 # Check the CRC (common for both packets).
1236 uCrc32 = getU32(abPayload, 0);
1237 if sOpcode == 'DATA':
1238 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1239 if uCrc32 != (uMyCrc32 & 0xffffffff):
1240 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1241 % (hex(uMyCrc32), hex(uCrc32)));
1242 rc = None;
1243 break;
1244 if sOpcode == 'DATA EOF':
1245 rc = self.sendMsg('ACK');
1246 break;
1247
1248 # Finally, push the data to the file.
1249 try:
1250 oLocalFile.write(abPayload[4:].tostring());
1251 except:
1252 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1253 rc = None;
1254 break;
1255 rc = self.sendMsg('ACK');
1256
1257 # Send NACK on validation and I/O errors.
1258 if rc is None:
1259 rc = self.sendMsg('NACK');
1260 rc = False;
1261 return rc;
1262
1263 def taskPackFile(self, sRemoteFile, sRemoteSource):
1264 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1265 if rc is True:
1266 rc = self.recvAckLogged('PKFILE');
1267 return rc;
1268
1269 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1270 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1271 if rc is True:
1272 rc = self.recvAckLogged('UNPKFILE');
1273 return rc;
1274
1275 # pylint: enable=missing-docstring
1276
1277
1278 #
1279 # Public methods - generic task queries
1280 #
1281
1282 def isSuccess(self):
1283 """Returns True if the task completed successfully, otherwise False."""
1284 self.lockTask();
1285 sStatus = self.sStatus;
1286 oTaskRc = self.oTaskRc;
1287 self.unlockTask();
1288 if sStatus != "":
1289 return False;
1290 if oTaskRc is False or oTaskRc is None:
1291 return False;
1292 return True;
1293
1294 def getResult(self):
1295 """
1296 Returns the result of a completed task.
1297 Returns None if not completed yet or no previous task.
1298 """
1299 self.lockTask();
1300 sStatus = self.sStatus;
1301 oTaskRc = self.oTaskRc;
1302 self.unlockTask();
1303 if sStatus != "":
1304 return None;
1305 return oTaskRc;
1306
1307 def getLastReply(self):
1308 """
1309 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1310 Returns a None, None, None three-tuple if there was no last reply.
1311 """
1312 self.lockTask();
1313 t3oReply = self.t3oReply;
1314 self.unlockTask();
1315 return t3oReply;
1316
1317 #
1318 # Public methods - connection.
1319 #
1320
1321 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1322 """
1323 Initiates a disconnect task.
1324
1325 Returns True on success, False on failure (logged).
1326
1327 The task returns True on success and False on failure.
1328 """
1329 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1330
1331 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1332 """Synchronous version."""
1333 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1334
1335 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1336 """
1337 Initiates a task for getting the TXS UUID.
1338
1339 Returns True on success, False on failure (logged).
1340
1341 The task returns UUID string (in {}) on success and False on failure.
1342 """
1343 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1344
1345 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1346 """Synchronous version."""
1347 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1348
1349 #
1350 # Public methods - execution.
1351 #
1352
1353 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1354 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1355 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1356 """
1357 Initiates a exec process task.
1358
1359 Returns True on success, False on failure (logged).
1360
1361 The task returns True if the process exited normally with status code 0.
1362 The task returns None if on failure prior to executing the process, and
1363 False if the process exited with a different status or in an abnormal
1364 manner. Both None and False are logged of course and further info can
1365 also be obtained by getLastReply().
1366
1367 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1368 these streams. If None, no special action is taken and the output goes
1369 to where ever the TXS sends its output, and ditto for input.
1370 - To send to / read from the bitbucket, pass '/dev/null'.
1371 - To redirect to/from a file, just specify the remote filename.
1372 - To append to a file use '>>' followed by the remote filename.
1373 - To pipe the stream to/from the TXS, specify a file like
1374 object. For StdIn a non-blocking read() method is required. For
1375 the other a write() method is required. Watch out for deadlock
1376 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1377 """
1378 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1379 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1380 oStdOut, oStdErr, oTestPipe, sAsUser));
1381
1382 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1383 oStdIn = '/dev/null', oStdOut = '/dev/null',
1384 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1385 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1386 """Synchronous version."""
1387 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1388 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1389
1390 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1391 cMsTimeout = 3600000, fIgnoreErrors = False):
1392 """
1393 Initiates a exec process test task.
1394
1395 Returns True on success, False on failure (logged).
1396
1397 The task returns True if the process exited normally with status code 0.
1398 The task returns None if on failure prior to executing the process, and
1399 False if the process exited with a different status or in an abnormal
1400 manner. Both None and False are logged of course and further info can
1401 also be obtained by getLastReply().
1402
1403 Standard in is taken from /dev/null. While both standard output and
1404 standard error goes directly to reporter.log(). The testpipe is piped
1405 to reporter.xxxx.
1406 """
1407
1408 sStdIn = '/dev/null';
1409 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1410 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1411 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1412 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1413
1414 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1415 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1416
1417 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1418 cMsTimeout = 3600000, fIgnoreErrors = False):
1419 """Synchronous version."""
1420 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1421 cMsTimeout, fIgnoreErrors);
1422
1423 #
1424 # Public methods - system
1425 #
1426
1427 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1428 """
1429 Initiates a reboot task.
1430
1431 Returns True on success, False on failure (logged).
1432
1433 The task returns True on success, False on failure (logged). The
1434 session will be disconnected on successful task completion.
1435 """
1436 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1437
1438 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1439 """Synchronous version."""
1440 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1441
1442 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1443 """
1444 Initiates a shutdown task.
1445
1446 Returns True on success, False on failure (logged).
1447
1448 The task returns True on success, False on failure (logged).
1449 """
1450 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1451
1452 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1453 """Synchronous version."""
1454 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1455
1456
1457 #
1458 # Public methods - file system
1459 #
1460
1461 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1462 """
1463 Initiates a mkdir task.
1464
1465 Returns True on success, False on failure (logged).
1466
1467 The task returns True on success, False on failure (logged).
1468 """
1469 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1470
1471 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1472 """Synchronous version."""
1473 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1474
1475 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1476 """
1477 Initiates a mkdir -p task.
1478
1479 Returns True on success, False on failure (logged).
1480
1481 The task returns True on success, False on failure (logged).
1482 """
1483 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1484
1485 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1486 """Synchronous version."""
1487 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1488
1489 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1490 """
1491 Initiates a symlink task.
1492
1493 Returns True on success, False on failure (logged).
1494
1495 The task returns True on success, False on failure (logged).
1496 """
1497 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1498
1499 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1500 """Synchronous version."""
1501 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1502
1503 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1504 """
1505 Initiates a rmdir task.
1506
1507 Returns True on success, False on failure (logged).
1508
1509 The task returns True on success, False on failure (logged).
1510 """
1511 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1512
1513 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1514 """Synchronous version."""
1515 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1516
1517 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1518 """
1519 Initiates a rmfile task.
1520
1521 Returns True on success, False on failure (logged).
1522
1523 The task returns True on success, False on failure (logged).
1524 """
1525 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1526
1527 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1528 """Synchronous version."""
1529 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1530
1531 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1532 """
1533 Initiates a rmsymlink task.
1534
1535 Returns True on success, False on failure (logged).
1536
1537 The task returns True on success, False on failure (logged).
1538 """
1539 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1540
1541 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1542 """Synchronous version."""
1543 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1544
1545 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1546 """
1547 Initiates a rmtree task.
1548
1549 Returns True on success, False on failure (logged).
1550
1551 The task returns True on success, False on failure (logged).
1552 """
1553 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1554
1555 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1556 """Synchronous version."""
1557 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1558
1559 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """
1561 Initiates a chmod task.
1562
1563 Returns True on success, False on failure (logged).
1564
1565 The task returns True on success, False on failure (logged).
1566 """
1567 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1568
1569 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1570 """Synchronous version."""
1571 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1572
1573 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """
1575 Initiates a chown task.
1576
1577 Returns True on success, False on failure (logged).
1578
1579 The task returns True on success, False on failure (logged).
1580 """
1581 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1582
1583 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1584 """Synchronous version."""
1585 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1586
1587 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """
1589 Initiates a is-dir query task.
1590
1591 Returns True on success, False on failure (logged).
1592
1593 The task returns True if it's a directory, False if it isn't, and
1594 None on error (logged).
1595 """
1596 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1597
1598 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1599 """Synchronous version."""
1600 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1601
1602 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1603 """
1604 Initiates a is-file query task.
1605
1606 Returns True on success, False on failure (logged).
1607
1608 The task returns True if it's a file, False if it isn't, and None on
1609 error (logged).
1610 """
1611 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1612
1613 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1614 """Synchronous version."""
1615 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1616
1617 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1618 """
1619 Initiates a is-symbolic-link query task.
1620
1621 Returns True on success, False on failure (logged).
1622
1623 The task returns True if it's a symbolic linke, False if it isn't, and
1624 None on error (logged).
1625 """
1626 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1627
1628 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1629 """Synchronous version."""
1630 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1631
1632 #def "STAT "
1633 #def "LSTAT "
1634 #def "LIST "
1635
1636 @staticmethod
1637 def calcFileXferTimeout(cbFile):
1638 """
1639 Calculates a reasonable timeout for an upload/download given the file size.
1640
1641 Returns timeout in milliseconds.
1642 """
1643 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1644
1645 @staticmethod
1646 def calcUploadTimeout(sLocalFile):
1647 """
1648 Calculates a reasonable timeout for an upload given the file (will stat it).
1649
1650 Returns timeout in milliseconds.
1651 """
1652 try: cbFile = os.path.getsize(sLocalFile);
1653 except: cbFile = 1024*1024;
1654 return Session.calcFileXferTimeout(cbFile);
1655
1656 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1657 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1658 """
1659 Initiates a download query task.
1660
1661 Returns True on success, False on failure (logged).
1662
1663 The task returns True on success, False on failure (logged).
1664 """
1665 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1666 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1667
1668 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1669 """Synchronous version."""
1670 if cMsTimeout <= 0:
1671 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1672 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1673
1674 def asyncUploadString(self, sContent, sRemoteFile,
1675 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1676 """
1677 Initiates a upload string task.
1678
1679 Returns True on success, False on failure (logged).
1680
1681 The task returns True on success, False on failure (logged).
1682 """
1683 if cMsTimeout <= 0:
1684 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1685 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1686 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1687
1688 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1689 """Synchronous version."""
1690 if cMsTimeout <= 0:
1691 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1692 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1693
1694 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1695 """
1696 Initiates a download file task.
1697
1698 Returns True on success, False on failure (logged).
1699
1700 The task returns True on success, False on failure (logged).
1701 """
1702 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1703
1704 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1705 """Synchronous version."""
1706 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1707
1708 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1709 cMsTimeout = 30000, fIgnoreErrors = False):
1710 """
1711 Initiates a download string task.
1712
1713 Returns True on success, False on failure (logged).
1714
1715 The task returns a byte string on success, False on failure (logged).
1716 """
1717 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1718 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1719
1720 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1721 cMsTimeout = 30000, fIgnoreErrors = False):
1722 """Synchronous version."""
1723 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1724 cMsTimeout, fIgnoreErrors);
1725
1726 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1727 """
1728 Initiates a packing file/directory task.
1729
1730 Returns True on success, False on failure (logged).
1731
1732 The task returns True on success, False on failure (logged).
1733 """
1734 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1735 (sRemoteFile, sRemoteSource));
1736
1737 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1738 """Synchronous version."""
1739 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1740
1741 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1742 """
1743 Initiates a unpack file task.
1744
1745 Returns True on success, False on failure (logged).
1746
1747 The task returns True on success, False on failure (logged).
1748 """
1749 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1750 (sRemoteFile, sRemoteDir));
1751
1752 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1753 """Synchronous version."""
1754 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1755
1756
1757class TransportTcp(TransportBase):
1758 """
1759 TCP transport layer for the TXS client session class.
1760 """
1761
1762 def __init__(self, sHostname, uPort, fReversedSetup):
1763 """
1764 Save the parameters. The session will call us back to make the
1765 connection later on its worker thread.
1766 """
1767 TransportBase.__init__(self, utils.getCallerName());
1768 self.sHostname = sHostname;
1769 self.fReversedSetup = fReversedSetup;
1770 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1771 self.oSocket = None;
1772 self.oWakeupW = None;
1773 self.oWakeupR = None;
1774 self.fConnectCanceled = False;
1775 self.fIsConnecting = False;
1776 self.oCv = threading.Condition();
1777 self.abReadAhead = array.array('B');
1778
1779 def toString(self):
1780 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1781 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1782 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1783 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1784
1785 def __isInProgressXcpt(self, oXcpt):
1786 """ In progress exception? """
1787 try:
1788 if isinstance(oXcpt, socket.error):
1789 try:
1790 if oXcpt.errno == errno.EINPROGRESS:
1791 return True;
1792 except: pass;
1793 # Windows?
1794 try:
1795 if oXcpt.errno == errno.EWOULDBLOCK:
1796 return True;
1797 except: pass;
1798 except:
1799 pass;
1800 return False;
1801
1802 def __isWouldBlockXcpt(self, oXcpt):
1803 """ Would block exception? """
1804 try:
1805 if isinstance(oXcpt, socket.error):
1806 try:
1807 if oXcpt.errno == errno.EWOULDBLOCK:
1808 return True;
1809 except: pass;
1810 try:
1811 if oXcpt.errno == errno.EAGAIN:
1812 return True;
1813 except: pass;
1814 except:
1815 pass;
1816 return False;
1817
1818 def __isConnectionReset(self, oXcpt):
1819 """ Connection reset by Peer or others. """
1820 try:
1821 if isinstance(oXcpt, socket.error):
1822 try:
1823 if oXcpt.errno == errno.ECONNRESET:
1824 return True;
1825 except: pass;
1826 try:
1827 if oXcpt.errno == errno.ENETRESET:
1828 return True;
1829 except: pass;
1830 except:
1831 pass;
1832 return False;
1833
1834 def _closeWakeupSockets(self):
1835 """ Closes the wakup sockets. Caller should own the CV. """
1836 oWakeupR = self.oWakeupR;
1837 self.oWakeupR = None;
1838 if oWakeupR is not None:
1839 oWakeupR.close();
1840
1841 oWakeupW = self.oWakeupW;
1842 self.oWakeupW = None;
1843 if oWakeupW is not None:
1844 oWakeupW.close();
1845
1846 return None;
1847
1848 def cancelConnect(self):
1849 # This is bad stuff.
1850 self.oCv.acquire();
1851 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1852 self.fConnectCanceled = True;
1853 if self.fIsConnecting:
1854 oSocket = self.oSocket;
1855 self.oSocket = None;
1856 if oSocket is not None:
1857 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1858 oSocket.close();
1859
1860 oWakeupW = self.oWakeupW;
1861 self.oWakeupW = None;
1862 if oWakeupW is not None:
1863 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1864 try: oWakeupW.send('cancelled!\n');
1865 except: reporter.logXcpt();
1866 try: oWakeupW.shutdown(socket.SHUT_WR);
1867 except: reporter.logXcpt();
1868 oWakeupW.close();
1869 self.oCv.release();
1870
1871 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1872 """ Connects to the TXS server as server, i.e. the reversed setup. """
1873 assert(self.fReversedSetup);
1874
1875 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1876
1877 # Workaround for bind() failure...
1878 try:
1879 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1880 except:
1881 reporter.errorXcpt('socket.listen(1) failed');
1882 return None;
1883
1884 # Bind the socket and make it listen.
1885 try:
1886 oSocket.bind((self.sHostname, self.uPort));
1887 except:
1888 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1889 return None;
1890 try:
1891 oSocket.listen(1);
1892 except:
1893 reporter.errorXcpt('socket.listen(1) failed');
1894 return None;
1895
1896 # Accept connections.
1897 oClientSocket = None;
1898 tClientAddr = None;
1899 try:
1900 (oClientSocket, tClientAddr) = oSocket.accept();
1901 except socket.error as e:
1902 if not self.__isInProgressXcpt(e):
1903 raise;
1904
1905 # Do the actual waiting.
1906 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1907 try:
1908 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1909 except socket.error as e:
1910 if e[0] != errno.EBADF or not self.fConnectCanceled:
1911 raise;
1912 reporter.log('socket.select() on accept was canceled');
1913 return None;
1914 except:
1915 reporter.logXcpt('socket.select() on accept');
1916
1917 # Try accept again.
1918 try:
1919 (oClientSocket, tClientAddr) = oSocket.accept();
1920 except socket.error as e:
1921 if not self.__isInProgressXcpt(e):
1922 if e[0] != errno.EBADF or not self.fConnectCanceled:
1923 raise;
1924 reporter.log('socket.accept() was canceled');
1925 return None;
1926 reporter.log('socket.accept() timed out');
1927 return False;
1928 except:
1929 reporter.errorXcpt('socket.accept() failed');
1930 return None;
1931 except:
1932 reporter.errorXcpt('socket.accept() failed');
1933 return None;
1934
1935 # Store the connected socket and throw away the server socket.
1936 self.oCv.acquire();
1937 if not self.fConnectCanceled:
1938 self.oSocket.close();
1939 self.oSocket = oClientSocket;
1940 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1941 self.oCv.release();
1942 return True;
1943
1944 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1945 """ Connects to the TXS server as client. """
1946 assert(not self.fReversedSetup);
1947
1948 # Connect w/ timeouts.
1949 rc = None;
1950 try:
1951 oSocket.connect((self.sHostname, self.uPort));
1952 rc = True;
1953 except socket.error as oXcpt:
1954 iRc = oXcpt.errno;
1955 if self.__isInProgressXcpt(oXcpt):
1956 # Do the actual waiting.
1957 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1958 try:
1959 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1960 if len(ttRc[1]) + len(ttRc[2]) == 0:
1961 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1962 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1963 rc = iRc == 0;
1964 except socket.error as oXcpt2:
1965 iRc = oXcpt2.errno;
1966 except:
1967 iRc = -42;
1968 reporter.fatalXcpt('socket.select() on connect failed');
1969
1970 if rc is True:
1971 pass;
1972 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
1973 rc = False; # try again.
1974 else:
1975 if iRc != errno.EBADF or not self.fConnectCanceled:
1976 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1977 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1978 except:
1979 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1980 return rc;
1981
1982
1983 def connect(self, cMsTimeout):
1984 # Create a non-blocking socket.
1985 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1986 try:
1987 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1988 except:
1989 reporter.fatalXcpt('socket.socket() failed');
1990 return None;
1991 try:
1992 oSocket.setblocking(0);
1993 except:
1994 oSocket.close();
1995 reporter.fatalXcpt('socket.socket() failed');
1996 return None;
1997
1998 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1999 oWakeupR = None;
2000 oWakeupW = None;
2001 if hasattr(socket, 'socketpair'):
2002 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2003 except: reporter.logXcpt('socket.socketpair() failed');
2004
2005 # Update the state.
2006 self.oCv.acquire();
2007 rc = None;
2008 if not self.fConnectCanceled:
2009 self.oSocket = oSocket;
2010 self.oWakeupW = oWakeupW;
2011 self.oWakeupR = oWakeupR;
2012 self.fIsConnecting = True;
2013 self.oCv.release();
2014
2015 # Try connect.
2016 if oWakeupR is None:
2017 oWakeupR = oSocket; # Avoid select failure.
2018 if self.fReversedSetup:
2019 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2020 else:
2021 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2022 oSocket = None;
2023
2024 # Update the state and cleanup on failure/cancel.
2025 self.oCv.acquire();
2026 if rc is True and self.fConnectCanceled:
2027 rc = False;
2028 self.fIsConnecting = False;
2029
2030 if rc is not True:
2031 if self.oSocket is not None:
2032 self.oSocket.close();
2033 self.oSocket = None;
2034 self._closeWakeupSockets();
2035 self.oCv.release();
2036
2037 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2038 return rc;
2039
2040 def disconnect(self, fQuiet = False):
2041 if self.oSocket is not None:
2042 self.abReadAhead = array.array('B');
2043
2044 # Try a shutting down the socket gracefully (draining it).
2045 try:
2046 self.oSocket.shutdown(socket.SHUT_WR);
2047 except:
2048 if not fQuiet:
2049 reporter.error('shutdown(SHUT_WR)');
2050 try:
2051 self.oSocket.setblocking(0); # just in case it's not set.
2052 sData = "1";
2053 while sData:
2054 sData = self.oSocket.recv(16384);
2055 except:
2056 pass;
2057
2058 # Close it.
2059 self.oCv.acquire();
2060 try: self.oSocket.setblocking(1);
2061 except: pass;
2062 self.oSocket.close();
2063 self.oSocket = None;
2064 else:
2065 self.oCv.acquire();
2066 self._closeWakeupSockets();
2067 self.oCv.release();
2068
2069 def sendBytes(self, abBuf, cMsTimeout):
2070 if self.oSocket is None:
2071 reporter.error('TransportTcp.sendBytes: No connection.');
2072 return False;
2073
2074 # Try send it all.
2075 try:
2076 cbSent = self.oSocket.send(abBuf);
2077 if cbSent == len(abBuf):
2078 return True;
2079 except Exception as oXcpt:
2080 if not self.__isWouldBlockXcpt(oXcpt):
2081 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2082 return False;
2083 cbSent = 0;
2084
2085 # Do a timed send.
2086 msStart = base.timestampMilli();
2087 while True:
2088 cMsElapsed = base.timestampMilli() - msStart;
2089 if cMsElapsed > cMsTimeout:
2090 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2091 break;
2092
2093 # wait.
2094 try:
2095 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2096 if ttRc[2] and not ttRc[1]:
2097 reporter.error('TranportTcp.sendBytes: select returned with exception');
2098 break;
2099 if not ttRc[1]:
2100 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2101 break;
2102 except:
2103 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2104 break;
2105
2106 # Try send more.
2107 try:
2108 cbSent += self.oSocket.send(abBuf[cbSent:]);
2109 if cbSent == len(abBuf):
2110 return True;
2111 except Exception as oXcpt:
2112 if not self.__isWouldBlockXcpt(oXcpt):
2113 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2114 break;
2115
2116 return False;
2117
2118 def __returnReadAheadBytes(self, cb):
2119 """ Internal worker for recvBytes. """
2120 assert(len(self.abReadAhead) >= cb);
2121 abRet = self.abReadAhead[:cb];
2122 self.abReadAhead = self.abReadAhead[cb:];
2123 return abRet;
2124
2125 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2126 if self.oSocket is None:
2127 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2128 return None;
2129
2130 # Try read in some more data without bothering with timeout handling first.
2131 if len(self.abReadAhead) < cb:
2132 try:
2133 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2134 if abBuf:
2135 self.abReadAhead.extend(array.array('B', abBuf));
2136 except Exception as oXcpt:
2137 if not self.__isWouldBlockXcpt(oXcpt):
2138 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2139 return None;
2140
2141 if len(self.abReadAhead) >= cb:
2142 return self.__returnReadAheadBytes(cb);
2143
2144 # Timeout loop.
2145 msStart = base.timestampMilli();
2146 while True:
2147 cMsElapsed = base.timestampMilli() - msStart;
2148 if cMsElapsed > cMsTimeout:
2149 if not fNoDataOk or self.abReadAhead:
2150 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2151 break;
2152
2153 # Wait.
2154 try:
2155 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2156 if ttRc[2] and not ttRc[0]:
2157 reporter.error('TranportTcp.recvBytes: select returned with exception');
2158 break;
2159 if not ttRc[0]:
2160 if not fNoDataOk or self.abReadAhead:
2161 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2162 % (len(self.abReadAhead), cb, fNoDataOk));
2163 break;
2164 except:
2165 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2166 break;
2167
2168 # Try read more.
2169 try:
2170 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2171 if not abBuf:
2172 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2173 % (len(self.abReadAhead), cb, fNoDataOk));
2174 self.disconnect();
2175 return None;
2176
2177 self.abReadAhead.extend(array.array('B', abBuf));
2178
2179 except Exception as oXcpt:
2180 reporter.log('recv => exception %s' % (oXcpt,));
2181 if not self.__isWouldBlockXcpt(oXcpt):
2182 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2183 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2184 break;
2185
2186 # Done?
2187 if len(self.abReadAhead) >= cb:
2188 return self.__returnReadAheadBytes(cb);
2189
2190 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2191 return None;
2192
2193 def isConnectionOk(self):
2194 if self.oSocket is None:
2195 return False;
2196 try:
2197 ttRc = select.select([], [], [self.oSocket], 0.0);
2198 if ttRc[2]:
2199 return False;
2200
2201 self.oSocket.send(array.array('B')); # send zero bytes.
2202 except:
2203 return False;
2204 return True;
2205
2206 def isRecvPending(self, cMsTimeout = 0):
2207 try:
2208 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2209 if not ttRc[0]:
2210 return False;
2211 except:
2212 pass;
2213 return True;
2214
2215
2216def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2217 """
2218 Opens a connection to a Test Execution Service via TCP, given its name.
2219
2220 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2221 or similar.
2222 """
2223 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2224 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2225 try:
2226 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2227 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2228 except:
2229 reporter.errorXcpt(None, 15);
2230 return None;
2231 return oSession;
2232
2233
2234def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2235 """
2236 Tries to open a connection to a Test Execution Service via TCP, given its name.
2237
2238 This differs from openTcpSession in that it won't log a connection failure
2239 as an error.
2240 """
2241 try:
2242 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2243 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2244 except:
2245 reporter.errorXcpt(None, 15);
2246 return None;
2247 return oSession;
2248
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette