VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 89530

Last change on this file since 89530 was 84966, checked in by vboxsync, 5 years ago

Validation Kit/testdriver: Fixed task status for asyncUuid().

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 86.5 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 84966 2020-06-26 10:32:57Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2020 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 84966 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B',
124 ( u32 % 256,
125 (u32 // 256) % 256,
126 (u32 // 65536) % 256,
127 (u32 // 16777216) % 256) );
128
129def escapeString(sString):
130 """
131 Does $ escaping of the string so TXS doesn't try do variable expansion.
132 """
133 return sString.replace('$', '$$');
134
135
136
137class TransportBase(object):
138 """
139 Base class for the transport layer.
140 """
141
142 def __init__(self, sCaller):
143 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
144 self.fDummy = 0;
145 self.abReadAheadHdr = array.array('B');
146
147 def toString(self):
148 """
149 Stringify the instance for logging and debugging.
150 """
151 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
152
153 def __str__(self):
154 return self.toString();
155
156 def cancelConnect(self):
157 """
158 Cancels any pending connect() call.
159 Returns None;
160 """
161 return None;
162
163 def connect(self, cMsTimeout):
164 """
165 Quietly attempts to connect to the TXS.
166
167 Returns True on success.
168 Returns False on retryable errors (no logging).
169 Returns None on fatal errors with details in the log.
170
171 Override this method, don't call super.
172 """
173 _ = cMsTimeout;
174 return False;
175
176 def disconnect(self, fQuiet = False):
177 """
178 Disconnect from the TXS.
179
180 Returns True.
181
182 Override this method, don't call super.
183 """
184 _ = fQuiet;
185 return True;
186
187 def sendBytes(self, abBuf, cMsTimeout):
188 """
189 Sends the bytes in the buffer abBuf to the TXS.
190
191 Returns True on success.
192 Returns False on failure and error details in the log.
193
194 Override this method, don't call super.
195
196 Remarks: len(abBuf) is always a multiple of 16.
197 """
198 _ = abBuf; _ = cMsTimeout;
199 return False;
200
201 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
202 """
203 Receive cb number of bytes from the TXS.
204
205 Returns the bytes (array('B')) on success.
206 Returns None on failure and error details in the log.
207
208 Override this method, don't call super.
209
210 Remarks: cb is always a multiple of 16.
211 """
212 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
213 return None;
214
215 def isConnectionOk(self):
216 """
217 Checks if the connection is OK.
218
219 Returns True if it is.
220 Returns False if it isn't (caller should call diconnect).
221
222 Override this method, don't call super.
223 """
224 return True;
225
226 def isRecvPending(self, cMsTimeout = 0):
227 """
228 Checks if there is incoming bytes, optionally waiting cMsTimeout
229 milliseconds for something to arrive.
230
231 Returns True if there is, False if there isn't.
232
233 Override this method, don't call super.
234 """
235 _ = cMsTimeout;
236 return False;
237
238 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
239 """
240 Sends a message (opcode + encoded payload).
241
242 Returns True on success.
243 Returns False on failure and error details in the log.
244 """
245 # Fix + check the opcode.
246 if len(sOpcode) < 2:
247 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
248 return False;
249 sOpcode = sOpcode.ljust(8);
250 if not isValidOpcodeEncoding(sOpcode):
251 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
252 return False;
253
254 # Start construct the message.
255 cbMsg = 16 + len(abPayload);
256 abMsg = array.array('B');
257 abMsg.extend(u32ToByteArray(cbMsg));
258 abMsg.extend((0, 0, 0, 0)); # uCrc32
259 try:
260 abMsg.extend(array.array('B', \
261 ( ord(sOpcode[0]), \
262 ord(sOpcode[1]), \
263 ord(sOpcode[2]), \
264 ord(sOpcode[3]), \
265 ord(sOpcode[4]), \
266 ord(sOpcode[5]), \
267 ord(sOpcode[6]), \
268 ord(sOpcode[7]) ) ) );
269 if abPayload:
270 abMsg.extend(abPayload);
271 except:
272 reporter.fatalXcpt('sendMsgInt: packing problem...');
273 return False;
274
275 # checksum it, padd it and send it off.
276 uCrc32 = zlib.crc32(abMsg[8:]);
277 abMsg[4:8] = u32ToByteArray(uCrc32);
278
279 while len(abMsg) % 16:
280 abMsg.append(0);
281
282 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
283 return self.sendBytes(abMsg, cMsTimeout);
284
285 def recvMsg(self, cMsTimeout, fNoDataOk = False):
286 """
287 Receives a message from the TXS.
288
289 Returns the message three-tuple: length, opcode, payload.
290 Returns (None, None, None) on failure and error details in the log.
291 """
292
293 # Read the header.
294 if self.abReadAheadHdr:
295 assert(len(self.abReadAheadHdr) == 16);
296 abHdr = self.abReadAheadHdr;
297 self.abReadAheadHdr = array.array('B');
298 else:
299 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
300 if abHdr is None:
301 return (None, None, None);
302 if len(abHdr) != 16:
303 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
304 return (None, None, None);
305
306 # Unpack and validate the header.
307 cbMsg = getU32(abHdr, 0);
308 uCrc32 = getU32(abHdr, 4);
309 sOpcode = abHdr[8:16].tostring().decode('ascii');
310
311 if cbMsg < 16:
312 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
313 return (None, None, None);
314 if cbMsg > 1024*1024:
315 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
316 return (None, None, None);
317 if not isValidOpcodeEncoding(sOpcode):
318 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
319 return (None, None, None);
320
321 # Get the payload (if any), dropping the padding.
322 abPayload = array.array('B');
323 if cbMsg > 16:
324 if cbMsg % 16:
325 cbPadding = 16 - (cbMsg % 16);
326 else:
327 cbPadding = 0;
328 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
329 if abPayload is None:
330 self.abReadAheadHdr = abHdr;
331 if not fNoDataOk :
332 reporter.log('recvMsg: failed to recv payload bytes!');
333 return (None, None, None);
334
335 while cbPadding > 0:
336 abPayload.pop();
337 cbPadding = cbPadding - 1;
338
339 # Check the CRC-32.
340 if uCrc32 != 0:
341 uActualCrc32 = zlib.crc32(abHdr[8:]);
342 if cbMsg > 16:
343 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
344 uActualCrc32 = uActualCrc32 & 0xffffffff;
345 if uCrc32 != uActualCrc32:
346 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
347 return (None, None, None);
348
349 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
350 return (cbMsg, sOpcode, abPayload);
351
352 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
353 """
354 Sends a message (opcode + payload tuple).
355
356 Returns True on success.
357 Returns False on failure and error details in the log.
358 Returns None if you pass the incorrectly typed parameters.
359 """
360 # Encode the payload.
361 abPayload = array.array('B');
362 for o in aoPayload:
363 try:
364 if utils.isString(o):
365 if sys.version_info[0] >= 3:
366 abPayload.extend(o.encode('utf_8'));
367 else:
368 # the primitive approach...
369 sUtf8 = o.encode('utf_8');
370 for ch in sUtf8:
371 abPayload.append(ord(ch))
372 abPayload.append(0);
373 elif isinstance(o, (long, int)):
374 if o < 0 or o > 0xffffffff:
375 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
376 return None;
377 abPayload.extend(u32ToByteArray(o));
378 elif isinstance(o, array.array):
379 abPayload.extend(o);
380 else:
381 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
382 return None;
383 except:
384 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
385 return None;
386 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
387
388
389class Session(TdTaskBase):
390 """
391 A Test eXecution Service (TXS) client session.
392 """
393
394 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
395 """
396 Construct a TXS session.
397
398 This starts by connecting to the TXS and will enter the signalled state
399 when connected or the timeout has been reached.
400 """
401 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
402 self.oTransport = oTransport;
403 self.sStatus = "";
404 self.cMsTimeout = 0;
405 self.fErr = True; # Whether to report errors as error.
406 self.msStart = 0;
407 self.oThread = None;
408 self.fnTask = self.taskDummy;
409 self.aTaskArgs = None;
410 self.oTaskRc = None;
411 self.t3oReply = (None, None, None);
412 self.fScrewedUpMsgState = False;
413 self.fTryConnect = fTryConnect;
414
415 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
416 raise base.GenError("startTask failed");
417
418 def __del__(self):
419 """Make sure to cancel the task when deleted."""
420 self.cancelTask();
421
422 def toString(self):
423 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
424 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
425 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
426 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
427
428 def taskDummy(self):
429 """Place holder to catch broken state handling."""
430 raise Exception();
431
432 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
433 """
434 Kicks of a new task.
435
436 cMsTimeout: The task timeout in milliseconds. Values less than
437 500 ms will be adjusted to 500 ms. This means it is
438 OK to use negative value.
439 sStatus: The task status.
440 fnTask: The method that'll execute the task.
441 aArgs: Arguments to pass to fnTask.
442
443 Returns True on success, False + error in log on failure.
444 """
445 if not self.cancelTask():
446 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
447 return False;
448
449 # Change status and make sure we're the
450 self.lockTask();
451 if self.sStatus != "":
452 self.unlockTask();
453 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
454 return False;
455 self.sStatus = "setup";
456 self.oTaskRc = None;
457 self.t3oReply = (None, None, None);
458 self.resetTaskLocked();
459 self.unlockTask();
460
461 self.cMsTimeout = max(cMsTimeout, 500);
462 self.fErr = not fIgnoreErrors;
463 self.fnTask = fnTask;
464 self.aTaskArgs = aArgs;
465 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
466 self.oThread.setDaemon(True);
467 self.msStart = base.timestampMilli();
468
469 self.lockTask();
470 self.sStatus = sStatus;
471 self.unlockTask();
472 self.oThread.start();
473
474 return True;
475
476 def cancelTask(self, fSync = True):
477 """
478 Attempts to cancel any pending tasks.
479 Returns success indicator (True/False).
480 """
481 self.lockTask();
482
483 if self.sStatus == "":
484 self.unlockTask();
485 return True;
486 if self.sStatus == "setup":
487 self.unlockTask();
488 return False;
489 if self.sStatus == "cancelled":
490 self.unlockTask();
491 return False;
492
493 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
494 if self.sStatus == 'connecting':
495 self.oTransport.cancelConnect();
496
497 self.sStatus = "cancelled";
498 oThread = self.oThread;
499 self.unlockTask();
500
501 if not fSync:
502 return False;
503
504 oThread.join(61.0);
505 return oThread.isAlive();
506
507 def taskThread(self):
508 """
509 The task thread function.
510 This does some housekeeping activities around the real task method call.
511 """
512 if not self.isCancelled():
513 try:
514 fnTask = self.fnTask;
515 oTaskRc = fnTask(*self.aTaskArgs);
516 except:
517 reporter.fatalXcpt('taskThread', 15);
518 oTaskRc = None;
519 else:
520 reporter.log('taskThread: cancelled already');
521
522 self.lockTask();
523
524 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
525 self.oTaskRc = oTaskRc;
526 self.oThread = None;
527 self.sStatus = '';
528 self.signalTaskLocked();
529
530 self.unlockTask();
531 return None;
532
533 def isCancelled(self):
534 """Internal method for checking if the task has been cancelled."""
535 self.lockTask();
536 sStatus = self.sStatus;
537 self.unlockTask();
538 if sStatus == "cancelled":
539 return True;
540 return False;
541
542 def hasTimedOut(self):
543 """Internal method for checking if the task has timed out or not."""
544 cMsLeft = self.getMsLeft();
545 if cMsLeft <= 0:
546 return True;
547 return False;
548
549 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
550 """Gets the time left until the timeout."""
551 cMsElapsed = base.timestampMilli() - self.msStart;
552 if cMsElapsed < 0:
553 return cMsMin;
554 cMsLeft = self.cMsTimeout - cMsElapsed;
555 if cMsLeft <= cMsMin:
556 return cMsMin;
557 if cMsLeft > cMsMax > 0:
558 return cMsMax
559 return cMsLeft;
560
561 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
562 """
563 Wrapper for TransportBase.recvMsg that stashes the response away
564 so the client can inspect it later on.
565 """
566 if cMsTimeout is None:
567 cMsTimeout = self.getMsLeft(500);
568 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
569 self.lockTask();
570 self.t3oReply = (cbMsg, sOpcode, abPayload);
571 self.unlockTask();
572 return (cbMsg, sOpcode, abPayload);
573
574 def recvAck(self, fNoDataOk = False):
575 """
576 Receives an ACK or error response from the TXS.
577
578 Returns True on success.
579 Returns False on timeout or transport error.
580 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
581 and there are always details of some sort or another.
582 """
583 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
584 if cbMsg is None:
585 return False;
586 sOpcode = sOpcode.strip()
587 if sOpcode == "ACK":
588 return True;
589 return (sOpcode, getSZ(abPayload, 0, sOpcode));
590
591 def recvAckLogged(self, sCommand, fNoDataOk = False):
592 """
593 Wrapper for recvAck and logging.
594 Returns True on success (ACK).
595 Returns False on time, transport error and errors signalled by TXS.
596 """
597 rc = self.recvAck(fNoDataOk);
598 if rc is not True and not fNoDataOk:
599 if rc is False:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 else:
602 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
603 rc = False;
604 return rc;
605
606 def recvTrueFalse(self, sCommand):
607 """
608 Receives a TRUE/FALSE response from the TXS.
609 Returns True on TRUE, False on FALSE and None on error/other (logged).
610 """
611 cbMsg, sOpcode, abPayload = self.recvReply();
612 if cbMsg is None:
613 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
614 return None;
615
616 sOpcode = sOpcode.strip()
617 if sOpcode == "TRUE":
618 return True;
619 if sOpcode == "FALSE":
620 return False;
621 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
622 return None;
623
624 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
625 """
626 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
627 """
628 if cMsTimeout is None:
629 cMsTimeout = self.getMsLeft(500);
630 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
631
632 def asyncToSync(self, fnAsync, *aArgs):
633 """
634 Wraps an asynchronous task into a synchronous operation.
635
636 Returns False on failure, task return status on success.
637 """
638 rc = fnAsync(*aArgs);
639 if rc is False:
640 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
641 return rc;
642
643 rc = self.waitForTask(self.cMsTimeout + 5000);
644 if rc is False:
645 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
646 self.cancelTask();
647 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
648 return False;
649
650 rc = self.getResult();
651 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
652 return rc;
653
654 #
655 # Connection tasks.
656 #
657
658 def taskConnect(self, cMsIdleFudge):
659 """Tries to connect to the TXS"""
660 while not self.isCancelled():
661 reporter.log2('taskConnect: connecting ...');
662 rc = self.oTransport.connect(self.getMsLeft(500));
663 if rc is True:
664 reporter.log('taskConnect: succeeded');
665 return self.taskGreet(cMsIdleFudge);
666 if rc is None:
667 reporter.log2('taskConnect: unable to connect');
668 return None;
669 if self.hasTimedOut():
670 reporter.log2('taskConnect: timed out');
671 if not self.fTryConnect:
672 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
673 return False;
674 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
675 if not self.fTryConnect:
676 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
677 return False;
678
679 def taskGreet(self, cMsIdleFudge):
680 """Greets the TXS"""
681 rc = self.sendMsg("HOWDY", ());
682 if rc is True:
683 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
684 if rc is True:
685 while cMsIdleFudge > 0:
686 cMsIdleFudge -= 1000;
687 time.sleep(1);
688 else:
689 self.oTransport.disconnect(self.fTryConnect);
690 return rc;
691
692 def taskBye(self):
693 """Says goodbye to the TXS"""
694 rc = self.sendMsg("BYE");
695 if rc is True:
696 rc = self.recvAckLogged("BYE");
697 self.oTransport.disconnect();
698 return rc;
699
700 def taskVer(self):
701 """Requests version information from TXS"""
702 rc = self.sendMsg("VER");
703 if rc is True:
704 rc = False;
705 cbMsg, sOpcode, abPayload = self.recvReply();
706 if cbMsg is not None:
707 sOpcode = sOpcode.strip();
708 if sOpcode == "ACK VER":
709 sVer = getSZ(abPayload, 0);
710 if sVer is not None:
711 rc = sVer;
712 else:
713 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
714 else:
715 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
716 return rc;
717
718 def taskUuid(self):
719 """Gets the TXS UUID"""
720 rc = self.sendMsg("UUID");
721 if rc is True:
722 rc = False;
723 cbMsg, sOpcode, abPayload = self.recvReply();
724 if cbMsg is not None:
725 sOpcode = sOpcode.strip()
726 if sOpcode == "ACK UUID":
727 sUuid = getSZ(abPayload, 0);
728 if sUuid is not None:
729 sUuid = '{%s}' % (sUuid,)
730 try:
731 _ = uuid.UUID(sUuid);
732 rc = sUuid;
733 except:
734 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
735 else:
736 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
737 else:
738 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
739 else:
740 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
741 return rc;
742
743 #
744 # Process task
745 # pylint: disable=missing-docstring
746 #
747
748 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
749 # Construct the payload.
750 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
751 for sArg in asArgs:
752 aoPayload.append('%s' % (sArg));
753 aoPayload.append(long(len(asAddEnv)));
754 for sPutEnv in asAddEnv:
755 aoPayload.append('%s' % (sPutEnv));
756 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
757 if utils.isString(o):
758 aoPayload.append(o);
759 elif o is not None:
760 aoPayload.append('|');
761 o.uTxsClientCrc32 = zlib.crc32(b'');
762 else:
763 aoPayload.append('');
764 aoPayload.append('%s' % (sAsUser));
765 aoPayload.append(long(self.cMsTimeout));
766
767 # Kick of the EXEC command.
768 rc = self.sendMsg('EXEC', aoPayload)
769 if rc is True:
770 rc = self.recvAckLogged('EXEC');
771 if rc is True:
772 # Loop till the process completes, feed input to the TXS and
773 # receive output from it.
774 sFailure = "";
775 msPendingInputReply = None;
776 cbMsg, sOpcode, abPayload = (None, None, None);
777 while True:
778 # Pending input?
779 if msPendingInputReply is None \
780 and oStdIn is not None \
781 and not utils.isString(oStdIn):
782 try:
783 sInput = oStdIn.read(65536);
784 except:
785 reporter.errorXcpt('read standard in');
786 sFailure = 'exception reading stdin';
787 rc = None;
788 break;
789 if sInput:
790 # Convert to a byte array before handing it of to sendMsg or the string
791 # will get some zero termination added breaking the CRC (and injecting
792 # unwanted bytes).
793 abInput = array.array('B', sInput.encode('utf-8'));
794 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
795 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
796 if rc is not True:
797 sFailure = 'sendMsg failure';
798 break;
799 msPendingInputReply = base.timestampMilli();
800 continue;
801
802 rc = self.sendMsg('STDINEOS');
803 oStdIn = None;
804 if rc is not True:
805 sFailure = 'sendMsg failure';
806 break;
807 msPendingInputReply = base.timestampMilli();
808
809 # Wait for input (500 ms timeout).
810 if cbMsg is None:
811 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
812 if cbMsg is None:
813 # Check for time out before restarting the loop.
814 # Note! Only doing timeout checking here does mean that
815 # the TXS may prevent us from timing out by
816 # flooding us with data. This is unlikely though.
817 if self.hasTimedOut() \
818 and ( msPendingInputReply is None \
819 or base.timestampMilli() - msPendingInputReply > 30000):
820 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
821 sFailure = 'timeout';
822 rc = None;
823 break;
824 # Check that the connection is OK.
825 if not self.oTransport.isConnectionOk():
826 self.oTransport.disconnect();
827 sFailure = 'disconnected';
828 rc = False;
829 break;
830 continue;
831
832 # Handle the response.
833 sOpcode = sOpcode.rstrip();
834 if sOpcode == 'STDOUT':
835 oOut = oStdOut;
836 elif sOpcode == 'STDERR':
837 oOut = oStdErr;
838 elif sOpcode == 'TESTPIPE':
839 oOut = oTestPipe;
840 else:
841 oOut = None;
842 if oOut is not None:
843 # Output from the process.
844 if len(abPayload) < 4:
845 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
846 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
847 rc = None;
848 break;
849 uStreamCrc32 = getU32(abPayload, 0);
850 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
851 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
852 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
853 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
854 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
855 rc = None;
856 break;
857 try:
858 oOut.write(abPayload[4:]);
859 except:
860 sFailure = 'exception writing %s' % (sOpcode);
861 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
862 rc = None;
863 break;
864 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
865 # Standard input is ignored. Ignore this condition for now.
866 msPendingInputReply = None;
867 reporter.log('taskExecEx: Standard input is ignored... why?');
868 del oStdIn.uTxsClientCrc32;
869 oStdIn = '/dev/null';
870 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
871 and msPendingInputReply is not None:
872 # TXS STDIN error, abort.
873 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
874 msPendingInputReply = None;
875 sFailure = 'TXS is out of memory for std input buffering';
876 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
877 rc = None;
878 break;
879 elif sOpcode == 'ACK' and msPendingInputReply is not None:
880 msPendingInputReply = None;
881 elif sOpcode.startswith('PROC '):
882 # Process status message, handle it outside the loop.
883 rc = True;
884 break;
885 else:
886 sFailure = 'Unexpected opcode %s' % (sOpcode);
887 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
888 rc = None;
889 break;
890 # Clear the message.
891 cbMsg, sOpcode, abPayload = (None, None, None);
892
893 # If we sent an STDIN packet and didn't get a reply yet, we'll give
894 # TXS some 5 seconds to reply to this. If we don't wait here we'll
895 # get screwed later on if we mix it up with the reply to some other
896 # command. Hackish.
897 if msPendingInputReply is not None:
898 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
899 if cbMsg2 is not None:
900 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
901 % (cbMsg2, sOpcode2, abPayload2));
902 msPendingInputReply = None;
903 else:
904 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
905 self.fScrewedUpMsgState = True;
906
907 # Parse the exit status (True), abort (None) or do nothing (False).
908 if rc is True:
909 if sOpcode != 'PROC OK':
910 # Do proper parsing some other day if needed:
911 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
912 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
913 rc = False;
914 else:
915 if rc is None:
916 # Abort it.
917 reporter.log('taskExecEx: sending ABORT...');
918 rc = self.sendMsg('ABORT');
919 while rc is True:
920 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
921 if cbMsg is None:
922 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
923 self.fScrewedUpMsgState = True;
924 break;
925 if sOpcode.startswith('PROC '):
926 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
927 break;
928 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
929 # Check that the connection is OK before looping.
930 if not self.oTransport.isConnectionOk():
931 self.oTransport.disconnect();
932 break;
933
934 # Fake response with the reason why we quit.
935 if sFailure is not None:
936 self.t3oReply = (0, 'EXECFAIL', sFailure);
937 rc = None;
938 else:
939 rc = None;
940
941 # Cleanup.
942 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
943 if o is not None and not utils.isString(o):
944 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
945 # Make sure all files are closed
946 o.close(); # pylint: disable=maybe-no-member
947 reporter.log('taskExecEx: returns %s' % (rc));
948 return rc;
949
950 #
951 # Admin tasks
952 #
953
954 def hlpRebootShutdownWaitForAck(self, sCmd):
955 """Wait for reboot/shutodwn ACK."""
956 rc = self.recvAckLogged(sCmd);
957 if rc is True:
958 # poll a little while for server to disconnect.
959 uMsStart = base.timestampMilli();
960 while self.oTransport.isConnectionOk() \
961 and base.timestampMilli() - uMsStart >= 5000:
962 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
963 break;
964 self.oTransport.disconnect();
965 return rc;
966
967 def taskReboot(self):
968 rc = self.sendMsg('REBOOT');
969 if rc is True:
970 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
971 return rc;
972
973 def taskShutdown(self):
974 rc = self.sendMsg('SHUTDOWN');
975 if rc is True:
976 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
977 return rc;
978
979 #
980 # CD/DVD control tasks.
981 #
982
983 ## TODO
984
985 #
986 # File system tasks
987 #
988
989 def taskMkDir(self, sRemoteDir, fMode):
990 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
991 if rc is True:
992 rc = self.recvAckLogged('MKDIR');
993 return rc;
994
995 def taskMkDirPath(self, sRemoteDir, fMode):
996 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
997 if rc is True:
998 rc = self.recvAckLogged('MKDRPATH');
999 return rc;
1000
1001 def taskMkSymlink(self, sLinkTarget, sLink):
1002 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1003 if rc is True:
1004 rc = self.recvAckLogged('MKSYMLNK');
1005 return rc;
1006
1007 def taskRmDir(self, sRemoteDir):
1008 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1009 if rc is True:
1010 rc = self.recvAckLogged('RMDIR');
1011 return rc;
1012
1013 def taskRmFile(self, sRemoteFile):
1014 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1015 if rc is True:
1016 rc = self.recvAckLogged('RMFILE');
1017 return rc;
1018
1019 def taskRmSymlink(self, sRemoteSymlink):
1020 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1021 if rc is True:
1022 rc = self.recvAckLogged('RMSYMLNK');
1023 return rc;
1024
1025 def taskRmTree(self, sRemoteTree):
1026 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1027 if rc is True:
1028 rc = self.recvAckLogged('RMTREE');
1029 return rc;
1030
1031 def taskChMod(self, sRemotePath, fMode):
1032 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1033 if rc is True:
1034 rc = self.recvAckLogged('CHMOD');
1035 return rc;
1036
1037 def taskChOwn(self, sRemotePath, idUser, idGroup):
1038 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1039 if rc is True:
1040 rc = self.recvAckLogged('CHOWN');
1041 return rc;
1042
1043 def taskIsDir(self, sRemoteDir):
1044 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1045 if rc is True:
1046 rc = self.recvTrueFalse('ISDIR');
1047 return rc;
1048
1049 def taskIsFile(self, sRemoteFile):
1050 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1051 if rc is True:
1052 rc = self.recvTrueFalse('ISFILE');
1053 return rc;
1054
1055 def taskIsSymlink(self, sRemoteSymlink):
1056 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1057 if rc is True:
1058 rc = self.recvTrueFalse('ISSYMLNK');
1059 return rc;
1060
1061 #def "STAT "
1062 #def "LSTAT "
1063 #def "LIST "
1064
1065 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1066 #
1067 # Open the local file (make sure it exist before bothering TXS) and
1068 # tell TXS that we want to upload a file.
1069 #
1070 try:
1071 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1072 except:
1073 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1074 return False;
1075
1076 # Common cause with taskUploadStr
1077 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1078
1079 # Cleanup.
1080 oLocalFile.close();
1081 return rc;
1082
1083 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1084 # Wrap sContent in a file like class.
1085 class InStringFile(object): # pylint: disable=too-few-public-methods
1086 def __init__(self, sContent):
1087 self.sContent = sContent;
1088 self.off = 0;
1089
1090 def read(self, cbMax):
1091 cbLeft = len(self.sContent) - self.off;
1092 if cbLeft == 0:
1093 return "";
1094 if cbLeft <= cbMax:
1095 sRet = self.sContent[self.off:(self.off + cbLeft)];
1096 else:
1097 sRet = self.sContent[self.off:(self.off + cbMax)];
1098 self.off = self.off + len(sRet);
1099 return sRet;
1100
1101 oLocalString = InStringFile(sContent);
1102 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1103
1104 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1105 """Common worker used by taskUploadFile and taskUploadString."""
1106 #
1107 # Command + ACK.
1108 #
1109 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1110 # Fall back on the old command if the new one is not known by the TXS.
1111 #
1112 if fMode == 0:
1113 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1114 if rc is True:
1115 rc = self.recvAckLogged('PUT FILE');
1116 else:
1117 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1118 if rc is True:
1119 rc = self.recvAck();
1120 if rc is False:
1121 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1122 elif rc is not True:
1123 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1124 # Fallback:
1125 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1126 if rc is True:
1127 rc = self.recvAckLogged('PUT FILE');
1128 else:
1129 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1130 rc = False;
1131 if rc is True:
1132 #
1133 # Push data packets until eof.
1134 #
1135 uMyCrc32 = zlib.crc32(b'');
1136 while True:
1137 # Read up to 64 KB of data.
1138 try:
1139 sRaw = oLocalFile.read(65536);
1140 except:
1141 rc = None;
1142 break;
1143
1144 # Convert to array - this is silly!
1145 abBuf = array.array('B');
1146 if utils.isString(sRaw):
1147 for i, _ in enumerate(sRaw):
1148 abBuf.append(ord(sRaw[i]));
1149 else:
1150 abBuf.extend(sRaw);
1151 sRaw = None;
1152
1153 # Update the file stream CRC and send it off.
1154 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1155 if not abBuf:
1156 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1157 else:
1158 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1159 if rc is False:
1160 break;
1161
1162 # Wait for the reply.
1163 rc = self.recvAck();
1164 if rc is not True:
1165 if rc is False:
1166 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1167 else:
1168 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1169 rc = False;
1170 break;
1171
1172 # EOF?
1173 if not abBuf:
1174 break;
1175
1176 # Send ABORT on ACK and I/O errors.
1177 if rc is None:
1178 rc = self.sendMsg('ABORT');
1179 if rc is True:
1180 self.recvAckLogged('ABORT');
1181 rc = False;
1182 return rc;
1183
1184 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1185 try:
1186 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1187 except:
1188 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1189 return False;
1190
1191 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1192
1193 oLocalFile.close();
1194 if rc is False:
1195 try:
1196 os.remove(sLocalFile);
1197 except:
1198 reporter.errorXcpt();
1199 return rc;
1200
1201 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1202 # Wrap sContent in a file like class.
1203 class OutStringFile(object): # pylint: disable=too-few-public-methods
1204 def __init__(self):
1205 self.asContent = [];
1206
1207 def write(self, sBuf):
1208 self.asContent.append(sBuf);
1209 return None;
1210
1211 oLocalString = OutStringFile();
1212 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1213 if rc is True:
1214 rc = '';
1215 for sBuf in oLocalString.asContent:
1216 if hasattr(sBuf, 'decode'):
1217 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1218 else:
1219 rc += sBuf;
1220 return rc;
1221
1222 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1223 """Common worker for taskDownloadFile and taskDownloadString."""
1224 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1225 if rc is True:
1226 #
1227 # Process data packets until eof.
1228 #
1229 uMyCrc32 = zlib.crc32(b'');
1230 while rc is True:
1231 cbMsg, sOpcode, abPayload = self.recvReply();
1232 if cbMsg is None:
1233 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1234 rc = None;
1235 break;
1236
1237 # Validate.
1238 sOpcode = sOpcode.rstrip();
1239 if sOpcode not in ('DATA', 'DATA EOF',):
1240 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1241 % (sOpcode, getSZ(abPayload, 0, "None")));
1242 rc = False;
1243 break;
1244 if sOpcode == 'DATA' and len(abPayload) < 4:
1245 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1246 rc = None;
1247 break;
1248 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1249 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1250 rc = None;
1251 break;
1252
1253 # Check the CRC (common for both packets).
1254 uCrc32 = getU32(abPayload, 0);
1255 if sOpcode == 'DATA':
1256 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1257 if uCrc32 != (uMyCrc32 & 0xffffffff):
1258 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1259 % (hex(uMyCrc32), hex(uCrc32)));
1260 rc = None;
1261 break;
1262 if sOpcode == 'DATA EOF':
1263 rc = self.sendMsg('ACK');
1264 break;
1265
1266 # Finally, push the data to the file.
1267 try:
1268 oLocalFile.write(abPayload[4:].tostring());
1269 except:
1270 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1271 rc = None;
1272 break;
1273 rc = self.sendMsg('ACK');
1274
1275 # Send NACK on validation and I/O errors.
1276 if rc is None:
1277 rc = self.sendMsg('NACK');
1278 rc = False;
1279 return rc;
1280
1281 def taskPackFile(self, sRemoteFile, sRemoteSource):
1282 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1283 if rc is True:
1284 rc = self.recvAckLogged('PKFILE');
1285 return rc;
1286
1287 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1288 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1289 if rc is True:
1290 rc = self.recvAckLogged('UNPKFILE');
1291 return rc;
1292
1293 # pylint: enable=missing-docstring
1294
1295
1296 #
1297 # Public methods - generic task queries
1298 #
1299
1300 def isSuccess(self):
1301 """Returns True if the task completed successfully, otherwise False."""
1302 self.lockTask();
1303 sStatus = self.sStatus;
1304 oTaskRc = self.oTaskRc;
1305 self.unlockTask();
1306 if sStatus != "":
1307 return False;
1308 if oTaskRc is False or oTaskRc is None:
1309 return False;
1310 return True;
1311
1312 def getResult(self):
1313 """
1314 Returns the result of a completed task.
1315 Returns None if not completed yet or no previous task.
1316 """
1317 self.lockTask();
1318 sStatus = self.sStatus;
1319 oTaskRc = self.oTaskRc;
1320 self.unlockTask();
1321 if sStatus != "":
1322 return None;
1323 return oTaskRc;
1324
1325 def getLastReply(self):
1326 """
1327 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1328 Returns a None, None, None three-tuple if there was no last reply.
1329 """
1330 self.lockTask();
1331 t3oReply = self.t3oReply;
1332 self.unlockTask();
1333 return t3oReply;
1334
1335 #
1336 # Public methods - connection.
1337 #
1338
1339 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1340 """
1341 Initiates a disconnect task.
1342
1343 Returns True on success, False on failure (logged).
1344
1345 The task returns True on success and False on failure.
1346 """
1347 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1348
1349 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1350 """Synchronous version."""
1351 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1352
1353 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1354 """
1355 Initiates a task for getting the TXS version information.
1356
1357 Returns True on success, False on failure (logged).
1358
1359 The task returns the version string on success and False on failure.
1360 """
1361 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1362
1363 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1364 """Synchronous version."""
1365 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1366
1367 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1368 """
1369 Initiates a task for getting the TXS UUID.
1370
1371 Returns True on success, False on failure (logged).
1372
1373 The task returns UUID string (in {}) on success and False on failure.
1374 """
1375 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1376
1377 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1378 """Synchronous version."""
1379 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1380
1381 #
1382 # Public methods - execution.
1383 #
1384
1385 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1386 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1387 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1388 """
1389 Initiates a exec process task.
1390
1391 Returns True on success, False on failure (logged).
1392
1393 The task returns True if the process exited normally with status code 0.
1394 The task returns None if on failure prior to executing the process, and
1395 False if the process exited with a different status or in an abnormal
1396 manner. Both None and False are logged of course and further info can
1397 also be obtained by getLastReply().
1398
1399 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1400 these streams. If None, no special action is taken and the output goes
1401 to where ever the TXS sends its output, and ditto for input.
1402 - To send to / read from the bitbucket, pass '/dev/null'.
1403 - To redirect to/from a file, just specify the remote filename.
1404 - To append to a file use '>>' followed by the remote filename.
1405 - To pipe the stream to/from the TXS, specify a file like
1406 object. For StdIn a non-blocking read() method is required. For
1407 the other a write() method is required. Watch out for deadlock
1408 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1409 """
1410 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1411 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1412 oStdOut, oStdErr, oTestPipe, sAsUser));
1413
1414 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1415 oStdIn = '/dev/null', oStdOut = '/dev/null',
1416 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1417 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1418 """Synchronous version."""
1419 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1420 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1421
1422 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1423 cMsTimeout = 3600000, fIgnoreErrors = False):
1424 """
1425 Initiates a exec process test task.
1426
1427 Returns True on success, False on failure (logged).
1428
1429 The task returns True if the process exited normally with status code 0.
1430 The task returns None if on failure prior to executing the process, and
1431 False if the process exited with a different status or in an abnormal
1432 manner. Both None and False are logged of course and further info can
1433 also be obtained by getLastReply().
1434
1435 Standard in is taken from /dev/null. While both standard output and
1436 standard error goes directly to reporter.log(). The testpipe is piped
1437 to reporter.xxxx.
1438 """
1439
1440 sStdIn = '/dev/null';
1441 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1442 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1443 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1444 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1445
1446 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1447 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1448
1449 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1450 cMsTimeout = 3600000, fIgnoreErrors = False):
1451 """Synchronous version."""
1452 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1453 cMsTimeout, fIgnoreErrors);
1454
1455 #
1456 # Public methods - system
1457 #
1458
1459 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1460 """
1461 Initiates a reboot task.
1462
1463 Returns True on success, False on failure (logged).
1464
1465 The task returns True on success, False on failure (logged). The
1466 session will be disconnected on successful task completion.
1467 """
1468 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1469
1470 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1471 """Synchronous version."""
1472 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1473
1474 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1475 """
1476 Initiates a shutdown task.
1477
1478 Returns True on success, False on failure (logged).
1479
1480 The task returns True on success, False on failure (logged).
1481 """
1482 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1483
1484 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1485 """Synchronous version."""
1486 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1487
1488
1489 #
1490 # Public methods - file system
1491 #
1492
1493 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1494 """
1495 Initiates a mkdir task.
1496
1497 Returns True on success, False on failure (logged).
1498
1499 The task returns True on success, False on failure (logged).
1500 """
1501 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1502
1503 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1504 """Synchronous version."""
1505 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1506
1507 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1508 """
1509 Initiates a mkdir -p task.
1510
1511 Returns True on success, False on failure (logged).
1512
1513 The task returns True on success, False on failure (logged).
1514 """
1515 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1516
1517 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1518 """Synchronous version."""
1519 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1520
1521 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """
1523 Initiates a symlink task.
1524
1525 Returns True on success, False on failure (logged).
1526
1527 The task returns True on success, False on failure (logged).
1528 """
1529 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1530
1531 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1532 """Synchronous version."""
1533 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1534
1535 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1536 """
1537 Initiates a rmdir task.
1538
1539 Returns True on success, False on failure (logged).
1540
1541 The task returns True on success, False on failure (logged).
1542 """
1543 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1544
1545 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1546 """Synchronous version."""
1547 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1548
1549 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1550 """
1551 Initiates a rmfile task.
1552
1553 Returns True on success, False on failure (logged).
1554
1555 The task returns True on success, False on failure (logged).
1556 """
1557 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1558
1559 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """Synchronous version."""
1561 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1562
1563 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """
1565 Initiates a rmsymlink task.
1566
1567 Returns True on success, False on failure (logged).
1568
1569 The task returns True on success, False on failure (logged).
1570 """
1571 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1572
1573 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """Synchronous version."""
1575 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1576
1577 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1578 """
1579 Initiates a rmtree task.
1580
1581 Returns True on success, False on failure (logged).
1582
1583 The task returns True on success, False on failure (logged).
1584 """
1585 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1586
1587 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """Synchronous version."""
1589 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1590
1591 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1592 """
1593 Initiates a chmod task.
1594
1595 Returns True on success, False on failure (logged).
1596
1597 The task returns True on success, False on failure (logged).
1598 """
1599 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1600
1601 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """Synchronous version."""
1603 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1604
1605 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1606 """
1607 Initiates a chown task.
1608
1609 Returns True on success, False on failure (logged).
1610
1611 The task returns True on success, False on failure (logged).
1612 """
1613 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1614
1615 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1616 """Synchronous version."""
1617 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1618
1619 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1620 """
1621 Initiates a is-dir query task.
1622
1623 Returns True on success, False on failure (logged).
1624
1625 The task returns True if it's a directory, False if it isn't, and
1626 None on error (logged).
1627 """
1628 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1629
1630 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1631 """Synchronous version."""
1632 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1633
1634 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1635 """
1636 Initiates a is-file query task.
1637
1638 Returns True on success, False on failure (logged).
1639
1640 The task returns True if it's a file, False if it isn't, and None on
1641 error (logged).
1642 """
1643 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1644
1645 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1646 """Synchronous version."""
1647 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1648
1649 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1650 """
1651 Initiates a is-symbolic-link query task.
1652
1653 Returns True on success, False on failure (logged).
1654
1655 The task returns True if it's a symbolic linke, False if it isn't, and
1656 None on error (logged).
1657 """
1658 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1659
1660 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1661 """Synchronous version."""
1662 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1663
1664 #def "STAT "
1665 #def "LSTAT "
1666 #def "LIST "
1667
1668 @staticmethod
1669 def calcFileXferTimeout(cbFile):
1670 """
1671 Calculates a reasonable timeout for an upload/download given the file size.
1672
1673 Returns timeout in milliseconds.
1674 """
1675 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1676
1677 @staticmethod
1678 def calcUploadTimeout(sLocalFile):
1679 """
1680 Calculates a reasonable timeout for an upload given the file (will stat it).
1681
1682 Returns timeout in milliseconds.
1683 """
1684 try: cbFile = os.path.getsize(sLocalFile);
1685 except: cbFile = 1024*1024;
1686 return Session.calcFileXferTimeout(cbFile);
1687
1688 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1689 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1690 """
1691 Initiates a download query task.
1692
1693 Returns True on success, False on failure (logged).
1694
1695 The task returns True on success, False on failure (logged).
1696 """
1697 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1698 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1699
1700 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1701 """Synchronous version."""
1702 if cMsTimeout <= 0:
1703 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1704 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1705
1706 def asyncUploadString(self, sContent, sRemoteFile,
1707 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1708 """
1709 Initiates a upload string task.
1710
1711 Returns True on success, False on failure (logged).
1712
1713 The task returns True on success, False on failure (logged).
1714 """
1715 if cMsTimeout <= 0:
1716 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1717 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1718 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1719
1720 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1721 """Synchronous version."""
1722 if cMsTimeout <= 0:
1723 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1724 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1725
1726 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1727 """
1728 Initiates a download file task.
1729
1730 Returns True on success, False on failure (logged).
1731
1732 The task returns True on success, False on failure (logged).
1733 """
1734 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1735
1736 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1737 """Synchronous version."""
1738 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1739
1740 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1741 cMsTimeout = 30000, fIgnoreErrors = False):
1742 """
1743 Initiates a download string task.
1744
1745 Returns True on success, False on failure (logged).
1746
1747 The task returns a byte string on success, False on failure (logged).
1748 """
1749 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1750 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1751
1752 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1753 cMsTimeout = 30000, fIgnoreErrors = False):
1754 """Synchronous version."""
1755 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1756 cMsTimeout, fIgnoreErrors);
1757
1758 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1759 """
1760 Initiates a packing file/directory task.
1761
1762 Returns True on success, False on failure (logged).
1763
1764 The task returns True on success, False on failure (logged).
1765 """
1766 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1767 (sRemoteFile, sRemoteSource));
1768
1769 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1770 """Synchronous version."""
1771 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1772
1773 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1774 """
1775 Initiates a unpack file task.
1776
1777 Returns True on success, False on failure (logged).
1778
1779 The task returns True on success, False on failure (logged).
1780 """
1781 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1782 (sRemoteFile, sRemoteDir));
1783
1784 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1785 """Synchronous version."""
1786 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1787
1788
1789class TransportTcp(TransportBase):
1790 """
1791 TCP transport layer for the TXS client session class.
1792 """
1793
1794 def __init__(self, sHostname, uPort, fReversedSetup):
1795 """
1796 Save the parameters. The session will call us back to make the
1797 connection later on its worker thread.
1798 """
1799 TransportBase.__init__(self, utils.getCallerName());
1800 self.sHostname = sHostname;
1801 self.fReversedSetup = fReversedSetup;
1802 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1803 self.oSocket = None;
1804 self.oWakeupW = None;
1805 self.oWakeupR = None;
1806 self.fConnectCanceled = False;
1807 self.fIsConnecting = False;
1808 self.oCv = threading.Condition();
1809 self.abReadAhead = array.array('B');
1810
1811 def toString(self):
1812 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1813 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1814 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1815 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1816
1817 def __isInProgressXcpt(self, oXcpt):
1818 """ In progress exception? """
1819 try:
1820 if isinstance(oXcpt, socket.error):
1821 try:
1822 if oXcpt.errno == errno.EINPROGRESS:
1823 return True;
1824 except: pass;
1825 # Windows?
1826 try:
1827 if oXcpt.errno == errno.EWOULDBLOCK:
1828 return True;
1829 except: pass;
1830 except:
1831 pass;
1832 return False;
1833
1834 def __isWouldBlockXcpt(self, oXcpt):
1835 """ Would block exception? """
1836 try:
1837 if isinstance(oXcpt, socket.error):
1838 try:
1839 if oXcpt.errno == errno.EWOULDBLOCK:
1840 return True;
1841 except: pass;
1842 try:
1843 if oXcpt.errno == errno.EAGAIN:
1844 return True;
1845 except: pass;
1846 except:
1847 pass;
1848 return False;
1849
1850 def __isConnectionReset(self, oXcpt):
1851 """ Connection reset by Peer or others. """
1852 try:
1853 if isinstance(oXcpt, socket.error):
1854 try:
1855 if oXcpt.errno == errno.ECONNRESET:
1856 return True;
1857 except: pass;
1858 try:
1859 if oXcpt.errno == errno.ENETRESET:
1860 return True;
1861 except: pass;
1862 except:
1863 pass;
1864 return False;
1865
1866 def _closeWakeupSockets(self):
1867 """ Closes the wakup sockets. Caller should own the CV. """
1868 oWakeupR = self.oWakeupR;
1869 self.oWakeupR = None;
1870 if oWakeupR is not None:
1871 oWakeupR.close();
1872
1873 oWakeupW = self.oWakeupW;
1874 self.oWakeupW = None;
1875 if oWakeupW is not None:
1876 oWakeupW.close();
1877
1878 return None;
1879
1880 def cancelConnect(self):
1881 # This is bad stuff.
1882 self.oCv.acquire();
1883 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1884 self.fConnectCanceled = True;
1885 if self.fIsConnecting:
1886 oSocket = self.oSocket;
1887 self.oSocket = None;
1888 if oSocket is not None:
1889 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1890 oSocket.close();
1891
1892 oWakeupW = self.oWakeupW;
1893 self.oWakeupW = None;
1894 if oWakeupW is not None:
1895 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1896 try: oWakeupW.send('cancelled!\n');
1897 except: reporter.logXcpt();
1898 try: oWakeupW.shutdown(socket.SHUT_WR);
1899 except: reporter.logXcpt();
1900 oWakeupW.close();
1901 self.oCv.release();
1902
1903 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1904 """ Connects to the TXS server as server, i.e. the reversed setup. """
1905 assert(self.fReversedSetup);
1906
1907 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1908
1909 # Workaround for bind() failure...
1910 try:
1911 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1912 except:
1913 reporter.errorXcpt('socket.listen(1) failed');
1914 return None;
1915
1916 # Bind the socket and make it listen.
1917 try:
1918 oSocket.bind((self.sHostname, self.uPort));
1919 except:
1920 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1921 return None;
1922 try:
1923 oSocket.listen(1);
1924 except:
1925 reporter.errorXcpt('socket.listen(1) failed');
1926 return None;
1927
1928 # Accept connections.
1929 oClientSocket = None;
1930 tClientAddr = None;
1931 try:
1932 (oClientSocket, tClientAddr) = oSocket.accept();
1933 except socket.error as e:
1934 if not self.__isInProgressXcpt(e):
1935 raise;
1936
1937 # Do the actual waiting.
1938 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1939 try:
1940 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1941 except socket.error as oXctp:
1942 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
1943 raise;
1944 reporter.log('socket.select() on accept was canceled');
1945 return None;
1946 except:
1947 reporter.logXcpt('socket.select() on accept');
1948
1949 # Try accept again.
1950 try:
1951 (oClientSocket, tClientAddr) = oSocket.accept();
1952 except socket.error as oXcpt:
1953 if not self.__isInProgressXcpt(e):
1954 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
1955 raise;
1956 reporter.log('socket.accept() was canceled');
1957 return None;
1958 reporter.log('socket.accept() timed out');
1959 return False;
1960 except:
1961 reporter.errorXcpt('socket.accept() failed');
1962 return None;
1963 except:
1964 reporter.errorXcpt('socket.accept() failed');
1965 return None;
1966
1967 # Store the connected socket and throw away the server socket.
1968 self.oCv.acquire();
1969 if not self.fConnectCanceled:
1970 self.oSocket.close();
1971 self.oSocket = oClientSocket;
1972 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1973 self.oCv.release();
1974 return True;
1975
1976 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1977 """ Connects to the TXS server as client. """
1978 assert(not self.fReversedSetup);
1979
1980 # Connect w/ timeouts.
1981 rc = None;
1982 try:
1983 oSocket.connect((self.sHostname, self.uPort));
1984 rc = True;
1985 except socket.error as oXcpt:
1986 iRc = oXcpt.errno;
1987 if self.__isInProgressXcpt(oXcpt):
1988 # Do the actual waiting.
1989 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1990 try:
1991 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1992 if len(ttRc[1]) + len(ttRc[2]) == 0:
1993 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1994 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1995 rc = iRc == 0;
1996 except socket.error as oXcpt2:
1997 iRc = oXcpt2.errno;
1998 except:
1999 iRc = -42;
2000 reporter.fatalXcpt('socket.select() on connect failed');
2001
2002 if rc is True:
2003 pass;
2004 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2005 rc = False; # try again.
2006 else:
2007 if iRc != errno.EBADF or not self.fConnectCanceled:
2008 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2009 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2010 except:
2011 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2012 return rc;
2013
2014
2015 def connect(self, cMsTimeout):
2016 # Create a non-blocking socket.
2017 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2018 try:
2019 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2020 except:
2021 reporter.fatalXcpt('socket.socket() failed');
2022 return None;
2023 try:
2024 oSocket.setblocking(0);
2025 except:
2026 oSocket.close();
2027 reporter.fatalXcpt('socket.socket() failed');
2028 return None;
2029
2030 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2031 oWakeupR = None;
2032 oWakeupW = None;
2033 if hasattr(socket, 'socketpair'):
2034 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2035 except: reporter.logXcpt('socket.socketpair() failed');
2036
2037 # Update the state.
2038 self.oCv.acquire();
2039 rc = None;
2040 if not self.fConnectCanceled:
2041 self.oSocket = oSocket;
2042 self.oWakeupW = oWakeupW;
2043 self.oWakeupR = oWakeupR;
2044 self.fIsConnecting = True;
2045 self.oCv.release();
2046
2047 # Try connect.
2048 if oWakeupR is None:
2049 oWakeupR = oSocket; # Avoid select failure.
2050 if self.fReversedSetup:
2051 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2052 else:
2053 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2054 oSocket = None;
2055
2056 # Update the state and cleanup on failure/cancel.
2057 self.oCv.acquire();
2058 if rc is True and self.fConnectCanceled:
2059 rc = False;
2060 self.fIsConnecting = False;
2061
2062 if rc is not True:
2063 if self.oSocket is not None:
2064 self.oSocket.close();
2065 self.oSocket = None;
2066 self._closeWakeupSockets();
2067 self.oCv.release();
2068
2069 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2070 return rc;
2071
2072 def disconnect(self, fQuiet = False):
2073 if self.oSocket is not None:
2074 self.abReadAhead = array.array('B');
2075
2076 # Try a shutting down the socket gracefully (draining it).
2077 try:
2078 self.oSocket.shutdown(socket.SHUT_WR);
2079 except:
2080 if not fQuiet:
2081 reporter.error('shutdown(SHUT_WR)');
2082 try:
2083 self.oSocket.setblocking(0); # just in case it's not set.
2084 sData = "1";
2085 while sData:
2086 sData = self.oSocket.recv(16384);
2087 except:
2088 pass;
2089
2090 # Close it.
2091 self.oCv.acquire();
2092 try: self.oSocket.setblocking(1);
2093 except: pass;
2094 self.oSocket.close();
2095 self.oSocket = None;
2096 else:
2097 self.oCv.acquire();
2098 self._closeWakeupSockets();
2099 self.oCv.release();
2100
2101 def sendBytes(self, abBuf, cMsTimeout):
2102 if self.oSocket is None:
2103 reporter.error('TransportTcp.sendBytes: No connection.');
2104 return False;
2105
2106 # Try send it all.
2107 try:
2108 cbSent = self.oSocket.send(abBuf);
2109 if cbSent == len(abBuf):
2110 return True;
2111 except Exception as oXcpt:
2112 if not self.__isWouldBlockXcpt(oXcpt):
2113 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2114 return False;
2115 cbSent = 0;
2116
2117 # Do a timed send.
2118 msStart = base.timestampMilli();
2119 while True:
2120 cMsElapsed = base.timestampMilli() - msStart;
2121 if cMsElapsed > cMsTimeout:
2122 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2123 break;
2124
2125 # wait.
2126 try:
2127 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2128 if ttRc[2] and not ttRc[1]:
2129 reporter.error('TranportTcp.sendBytes: select returned with exception');
2130 break;
2131 if not ttRc[1]:
2132 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2133 break;
2134 except:
2135 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2136 break;
2137
2138 # Try send more.
2139 try:
2140 cbSent += self.oSocket.send(abBuf[cbSent:]);
2141 if cbSent == len(abBuf):
2142 return True;
2143 except Exception as oXcpt:
2144 if not self.__isWouldBlockXcpt(oXcpt):
2145 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2146 break;
2147
2148 return False;
2149
2150 def __returnReadAheadBytes(self, cb):
2151 """ Internal worker for recvBytes. """
2152 assert(len(self.abReadAhead) >= cb);
2153 abRet = self.abReadAhead[:cb];
2154 self.abReadAhead = self.abReadAhead[cb:];
2155 return abRet;
2156
2157 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2158 if self.oSocket is None:
2159 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2160 return None;
2161
2162 # Try read in some more data without bothering with timeout handling first.
2163 if len(self.abReadAhead) < cb:
2164 try:
2165 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2166 if abBuf:
2167 self.abReadAhead.extend(array.array('B', abBuf));
2168 except Exception as oXcpt:
2169 if not self.__isWouldBlockXcpt(oXcpt):
2170 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2171 return None;
2172
2173 if len(self.abReadAhead) >= cb:
2174 return self.__returnReadAheadBytes(cb);
2175
2176 # Timeout loop.
2177 msStart = base.timestampMilli();
2178 while True:
2179 cMsElapsed = base.timestampMilli() - msStart;
2180 if cMsElapsed > cMsTimeout:
2181 if not fNoDataOk or self.abReadAhead:
2182 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2183 break;
2184
2185 # Wait.
2186 try:
2187 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2188 if ttRc[2] and not ttRc[0]:
2189 reporter.error('TranportTcp.recvBytes: select returned with exception');
2190 break;
2191 if not ttRc[0]:
2192 if not fNoDataOk or self.abReadAhead:
2193 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2194 % (len(self.abReadAhead), cb, fNoDataOk));
2195 break;
2196 except:
2197 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2198 break;
2199
2200 # Try read more.
2201 try:
2202 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2203 if not abBuf:
2204 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2205 % (len(self.abReadAhead), cb, fNoDataOk));
2206 self.disconnect();
2207 return None;
2208
2209 self.abReadAhead.extend(array.array('B', abBuf));
2210
2211 except Exception as oXcpt:
2212 reporter.log('recv => exception %s' % (oXcpt,));
2213 if not self.__isWouldBlockXcpt(oXcpt):
2214 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2215 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2216 break;
2217
2218 # Done?
2219 if len(self.abReadAhead) >= cb:
2220 return self.__returnReadAheadBytes(cb);
2221
2222 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2223 return None;
2224
2225 def isConnectionOk(self):
2226 if self.oSocket is None:
2227 return False;
2228 try:
2229 ttRc = select.select([], [], [self.oSocket], 0.0);
2230 if ttRc[2]:
2231 return False;
2232
2233 self.oSocket.send(array.array('B')); # send zero bytes.
2234 except:
2235 return False;
2236 return True;
2237
2238 def isRecvPending(self, cMsTimeout = 0):
2239 try:
2240 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2241 if not ttRc[0]:
2242 return False;
2243 except:
2244 pass;
2245 return True;
2246
2247
2248def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2249 """
2250 Opens a connection to a Test Execution Service via TCP, given its name.
2251
2252 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2253 or similar.
2254 """
2255 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2256 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2257 try:
2258 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2259 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2260 except:
2261 reporter.errorXcpt(None, 15);
2262 return None;
2263 return oSession;
2264
2265
2266def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2267 """
2268 Tries to open a connection to a Test Execution Service via TCP, given its name.
2269
2270 This differs from openTcpSession in that it won't log a connection failure
2271 as an error.
2272 """
2273 try:
2274 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2275 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2276 except:
2277 reporter.errorXcpt(None, 15);
2278 return None;
2279 return oSession;
2280
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette