VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 82894

Last change on this file since 82894 was 79662, checked in by vboxsync, 6 years ago

ValidationKit/txsclient.py: Fix python 3.x incompatibility when handling stdin packets in the EXEC task. zlib.crc32 wants bytes or an array of bytes as the input type and creating a byte array requires the input string to be encoded as bytes

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 83.5 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 79662 2019-07-10 08:40:51Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2019 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 79662 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B',
124 ( u32 % 256,
125 (u32 // 256) % 256,
126 (u32 // 65536) % 256,
127 (u32 // 16777216) % 256) );
128
129def escapeString(sString):
130 """
131 Does $ escaping of the string so TXS doesn't try do variable expansion.
132 """
133 return sString.replace('$', '$$');
134
135
136
137class TransportBase(object):
138 """
139 Base class for the transport layer.
140 """
141
142 def __init__(self, sCaller):
143 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
144 self.fDummy = 0;
145 self.abReadAheadHdr = array.array('B');
146
147 def toString(self):
148 """
149 Stringify the instance for logging and debugging.
150 """
151 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
152
153 def __str__(self):
154 return self.toString();
155
156 def cancelConnect(self):
157 """
158 Cancels any pending connect() call.
159 Returns None;
160 """
161 return None;
162
163 def connect(self, cMsTimeout):
164 """
165 Quietly attempts to connect to the TXS.
166
167 Returns True on success.
168 Returns False on retryable errors (no logging).
169 Returns None on fatal errors with details in the log.
170
171 Override this method, don't call super.
172 """
173 _ = cMsTimeout;
174 return False;
175
176 def disconnect(self, fQuiet = False):
177 """
178 Disconnect from the TXS.
179
180 Returns True.
181
182 Override this method, don't call super.
183 """
184 _ = fQuiet;
185 return True;
186
187 def sendBytes(self, abBuf, cMsTimeout):
188 """
189 Sends the bytes in the buffer abBuf to the TXS.
190
191 Returns True on success.
192 Returns False on failure and error details in the log.
193
194 Override this method, don't call super.
195
196 Remarks: len(abBuf) is always a multiple of 16.
197 """
198 _ = abBuf; _ = cMsTimeout;
199 return False;
200
201 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
202 """
203 Receive cb number of bytes from the TXS.
204
205 Returns the bytes (array('B')) on success.
206 Returns None on failure and error details in the log.
207
208 Override this method, don't call super.
209
210 Remarks: cb is always a multiple of 16.
211 """
212 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
213 return None;
214
215 def isConnectionOk(self):
216 """
217 Checks if the connection is OK.
218
219 Returns True if it is.
220 Returns False if it isn't (caller should call diconnect).
221
222 Override this method, don't call super.
223 """
224 return True;
225
226 def isRecvPending(self, cMsTimeout = 0):
227 """
228 Checks if there is incoming bytes, optionally waiting cMsTimeout
229 milliseconds for something to arrive.
230
231 Returns True if there is, False if there isn't.
232
233 Override this method, don't call super.
234 """
235 _ = cMsTimeout;
236 return False;
237
238 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
239 """
240 Sends a message (opcode + encoded payload).
241
242 Returns True on success.
243 Returns False on failure and error details in the log.
244 """
245 # Fix + check the opcode.
246 if len(sOpcode) < 2:
247 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
248 return False;
249 sOpcode = sOpcode.ljust(8);
250 if not isValidOpcodeEncoding(sOpcode):
251 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
252 return False;
253
254 # Start construct the message.
255 cbMsg = 16 + len(abPayload);
256 abMsg = array.array('B');
257 abMsg.extend(u32ToByteArray(cbMsg));
258 abMsg.extend((0, 0, 0, 0)); # uCrc32
259 try:
260 abMsg.extend(array.array('B', \
261 ( ord(sOpcode[0]), \
262 ord(sOpcode[1]), \
263 ord(sOpcode[2]), \
264 ord(sOpcode[3]), \
265 ord(sOpcode[4]), \
266 ord(sOpcode[5]), \
267 ord(sOpcode[6]), \
268 ord(sOpcode[7]) ) ) );
269 if abPayload:
270 abMsg.extend(abPayload);
271 except:
272 reporter.fatalXcpt('sendMsgInt: packing problem...');
273 return False;
274
275 # checksum it, padd it and send it off.
276 uCrc32 = zlib.crc32(abMsg[8:]);
277 abMsg[4:8] = u32ToByteArray(uCrc32);
278
279 while len(abMsg) % 16:
280 abMsg.append(0);
281
282 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
283 return self.sendBytes(abMsg, cMsTimeout);
284
285 def recvMsg(self, cMsTimeout, fNoDataOk = False):
286 """
287 Receives a message from the TXS.
288
289 Returns the message three-tuple: length, opcode, payload.
290 Returns (None, None, None) on failure and error details in the log.
291 """
292
293 # Read the header.
294 if self.abReadAheadHdr:
295 assert(len(self.abReadAheadHdr) == 16);
296 abHdr = self.abReadAheadHdr;
297 self.abReadAheadHdr = array.array('B');
298 else:
299 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
300 if abHdr is None:
301 return (None, None, None);
302 if len(abHdr) != 16:
303 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
304 return (None, None, None);
305
306 # Unpack and validate the header.
307 cbMsg = getU32(abHdr, 0);
308 uCrc32 = getU32(abHdr, 4);
309 sOpcode = abHdr[8:16].tostring().decode('ascii');
310
311 if cbMsg < 16:
312 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
313 return (None, None, None);
314 if cbMsg > 1024*1024:
315 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
316 return (None, None, None);
317 if not isValidOpcodeEncoding(sOpcode):
318 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
319 return (None, None, None);
320
321 # Get the payload (if any), dropping the padding.
322 abPayload = array.array('B');
323 if cbMsg > 16:
324 if cbMsg % 16:
325 cbPadding = 16 - (cbMsg % 16);
326 else:
327 cbPadding = 0;
328 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
329 if abPayload is None:
330 self.abReadAheadHdr = abHdr;
331 if not fNoDataOk :
332 reporter.log('recvMsg: failed to recv payload bytes!');
333 return (None, None, None);
334
335 while cbPadding > 0:
336 abPayload.pop();
337 cbPadding = cbPadding - 1;
338
339 # Check the CRC-32.
340 if uCrc32 != 0:
341 uActualCrc32 = zlib.crc32(abHdr[8:]);
342 if cbMsg > 16:
343 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
344 uActualCrc32 = uActualCrc32 & 0xffffffff;
345 if uCrc32 != uActualCrc32:
346 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
347 return (None, None, None);
348
349 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
350 return (cbMsg, sOpcode, abPayload);
351
352 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
353 """
354 Sends a message (opcode + payload tuple).
355
356 Returns True on success.
357 Returns False on failure and error details in the log.
358 Returns None if you pass the incorrectly typed parameters.
359 """
360 # Encode the payload.
361 abPayload = array.array('B');
362 for o in aoPayload:
363 try:
364 if utils.isString(o):
365 if sys.version_info[0] >= 3:
366 abPayload.extend(o.encode('utf_8'));
367 else:
368 # the primitive approach...
369 sUtf8 = o.encode('utf_8');
370 for ch in sUtf8:
371 abPayload.append(ord(ch))
372 abPayload.append(0);
373 elif isinstance(o, (long, int)):
374 if o < 0 or o > 0xffffffff:
375 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
376 return None;
377 abPayload.extend(u32ToByteArray(o));
378 elif isinstance(o, array.array):
379 abPayload.extend(o);
380 else:
381 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
382 return None;
383 except:
384 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
385 return None;
386 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
387
388
389class Session(TdTaskBase):
390 """
391 A Test eXecution Service (TXS) client session.
392 """
393
394 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
395 """
396 Construct a TXS session.
397
398 This starts by connecting to the TXS and will enter the signalled state
399 when connected or the timeout has been reached.
400 """
401 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
402 self.oTransport = oTransport;
403 self.sStatus = "";
404 self.cMsTimeout = 0;
405 self.fErr = True; # Whether to report errors as error.
406 self.msStart = 0;
407 self.oThread = None;
408 self.fnTask = self.taskDummy;
409 self.aTaskArgs = None;
410 self.oTaskRc = None;
411 self.t3oReply = (None, None, None);
412 self.fScrewedUpMsgState = False;
413 self.fTryConnect = fTryConnect;
414
415 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
416 raise base.GenError("startTask failed");
417
418 def __del__(self):
419 """Make sure to cancel the task when deleted."""
420 self.cancelTask();
421
422 def toString(self):
423 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
424 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
425 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
426 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
427
428 def taskDummy(self):
429 """Place holder to catch broken state handling."""
430 raise Exception();
431
432 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
433 """
434 Kicks of a new task.
435
436 cMsTimeout: The task timeout in milliseconds. Values less than
437 500 ms will be adjusted to 500 ms. This means it is
438 OK to use negative value.
439 sStatus: The task status.
440 fnTask: The method that'll execute the task.
441 aArgs: Arguments to pass to fnTask.
442
443 Returns True on success, False + error in log on failure.
444 """
445 if not self.cancelTask():
446 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
447 return False;
448
449 # Change status and make sure we're the
450 self.lockTask();
451 if self.sStatus != "":
452 self.unlockTask();
453 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
454 return False;
455 self.sStatus = "setup";
456 self.oTaskRc = None;
457 self.t3oReply = (None, None, None);
458 self.resetTaskLocked();
459 self.unlockTask();
460
461 self.cMsTimeout = max(cMsTimeout, 500);
462 self.fErr = not fIgnoreErrors;
463 self.fnTask = fnTask;
464 self.aTaskArgs = aArgs;
465 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
466 self.oThread.setDaemon(True);
467 self.msStart = base.timestampMilli();
468
469 self.lockTask();
470 self.sStatus = sStatus;
471 self.unlockTask();
472 self.oThread.start();
473
474 return True;
475
476 def cancelTask(self, fSync = True):
477 """
478 Attempts to cancel any pending tasks.
479 Returns success indicator (True/False).
480 """
481 self.lockTask();
482
483 if self.sStatus == "":
484 self.unlockTask();
485 return True;
486 if self.sStatus == "setup":
487 self.unlockTask();
488 return False;
489 if self.sStatus == "cancelled":
490 self.unlockTask();
491 return False;
492
493 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
494 if self.sStatus == 'connecting':
495 self.oTransport.cancelConnect();
496
497 self.sStatus = "cancelled";
498 oThread = self.oThread;
499 self.unlockTask();
500
501 if not fSync:
502 return False;
503
504 oThread.join(61.0);
505 return oThread.isAlive();
506
507 def taskThread(self):
508 """
509 The task thread function.
510 This does some housekeeping activities around the real task method call.
511 """
512 if not self.isCancelled():
513 try:
514 fnTask = self.fnTask;
515 oTaskRc = fnTask(*self.aTaskArgs);
516 except:
517 reporter.fatalXcpt('taskThread', 15);
518 oTaskRc = None;
519 else:
520 reporter.log('taskThread: cancelled already');
521
522 self.lockTask();
523
524 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
525 self.oTaskRc = oTaskRc;
526 self.oThread = None;
527 self.sStatus = '';
528 self.signalTaskLocked();
529
530 self.unlockTask();
531 return None;
532
533 def isCancelled(self):
534 """Internal method for checking if the task has been cancelled."""
535 self.lockTask();
536 sStatus = self.sStatus;
537 self.unlockTask();
538 if sStatus == "cancelled":
539 return True;
540 return False;
541
542 def hasTimedOut(self):
543 """Internal method for checking if the task has timed out or not."""
544 cMsLeft = self.getMsLeft();
545 if cMsLeft <= 0:
546 return True;
547 return False;
548
549 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
550 """Gets the time left until the timeout."""
551 cMsElapsed = base.timestampMilli() - self.msStart;
552 if cMsElapsed < 0:
553 return cMsMin;
554 cMsLeft = self.cMsTimeout - cMsElapsed;
555 if cMsLeft <= cMsMin:
556 return cMsMin;
557 if cMsLeft > cMsMax > 0:
558 return cMsMax
559 return cMsLeft;
560
561 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
562 """
563 Wrapper for TransportBase.recvMsg that stashes the response away
564 so the client can inspect it later on.
565 """
566 if cMsTimeout is None:
567 cMsTimeout = self.getMsLeft(500);
568 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
569 self.lockTask();
570 self.t3oReply = (cbMsg, sOpcode, abPayload);
571 self.unlockTask();
572 return (cbMsg, sOpcode, abPayload);
573
574 def recvAck(self, fNoDataOk = False):
575 """
576 Receives an ACK or error response from the TXS.
577
578 Returns True on success.
579 Returns False on timeout or transport error.
580 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
581 and there are always details of some sort or another.
582 """
583 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
584 if cbMsg is None:
585 return False;
586 sOpcode = sOpcode.strip()
587 if sOpcode == "ACK":
588 return True;
589 return (sOpcode, getSZ(abPayload, 0, sOpcode));
590
591 def recvAckLogged(self, sCommand, fNoDataOk = False):
592 """
593 Wrapper for recvAck and logging.
594 Returns True on success (ACK).
595 Returns False on time, transport error and errors signalled by TXS.
596 """
597 rc = self.recvAck(fNoDataOk);
598 if rc is not True and not fNoDataOk:
599 if rc is False:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 else:
602 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
603 rc = False;
604 return rc;
605
606 def recvTrueFalse(self, sCommand):
607 """
608 Receives a TRUE/FALSE response from the TXS.
609 Returns True on TRUE, False on FALSE and None on error/other (logged).
610 """
611 cbMsg, sOpcode, abPayload = self.recvReply();
612 if cbMsg is None:
613 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
614 return None;
615
616 sOpcode = sOpcode.strip()
617 if sOpcode == "TRUE":
618 return True;
619 if sOpcode == "FALSE":
620 return False;
621 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
622 return None;
623
624 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
625 """
626 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
627 """
628 if cMsTimeout is None:
629 cMsTimeout = self.getMsLeft(500);
630 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
631
632 def asyncToSync(self, fnAsync, *aArgs):
633 """
634 Wraps an asynchronous task into a synchronous operation.
635
636 Returns False on failure, task return status on success.
637 """
638 rc = fnAsync(*aArgs);
639 if rc is False:
640 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
641 return rc;
642
643 rc = self.waitForTask(self.cMsTimeout + 5000);
644 if rc is False:
645 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask failed...');
646 self.cancelTask();
647 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
648 return False;
649
650 rc = self.getResult();
651 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
652 return rc;
653
654 #
655 # Connection tasks.
656 #
657
658 def taskConnect(self, cMsIdleFudge):
659 """Tries to connect to the TXS"""
660 while not self.isCancelled():
661 reporter.log2('taskConnect: connecting ...');
662 rc = self.oTransport.connect(self.getMsLeft(500));
663 if rc is True:
664 reporter.log('taskConnect: succeeded');
665 return self.taskGreet(cMsIdleFudge);
666 if rc is None:
667 reporter.log2('taskConnect: unable to connect');
668 return None;
669 if self.hasTimedOut():
670 reporter.log2('taskConnect: timed out');
671 if not self.fTryConnect:
672 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
673 return False;
674 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
675 if not self.fTryConnect:
676 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
677 return False;
678
679 def taskGreet(self, cMsIdleFudge):
680 """Greets the TXS"""
681 rc = self.sendMsg("HOWDY", ());
682 if rc is True:
683 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
684 if rc is True:
685 while cMsIdleFudge > 0:
686 cMsIdleFudge -= 1000;
687 time.sleep(1);
688 else:
689 self.oTransport.disconnect(self.fTryConnect);
690 return rc;
691
692 def taskBye(self):
693 """Says goodbye to the TXS"""
694 rc = self.sendMsg("BYE");
695 if rc is True:
696 rc = self.recvAckLogged("BYE");
697 self.oTransport.disconnect();
698 return rc;
699
700 def taskUuid(self):
701 """Gets the TXS UUID"""
702 rc = self.sendMsg("UUID");
703 if rc is True:
704 rc = False;
705 cbMsg, sOpcode, abPayload = self.recvReply();
706 if cbMsg is not None:
707 sOpcode = sOpcode.strip()
708 if sOpcode == "ACK UUID":
709 sUuid = getSZ(abPayload, 0);
710 if sUuid is not None:
711 sUuid = '{%s}' % (sUuid,)
712 try:
713 _ = uuid.UUID(sUuid);
714 rc = sUuid;
715 except:
716 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
717 else:
718 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
719 else:
720 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
721 else:
722 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
723 return rc;
724
725 #
726 # Process task
727 # pylint: disable=missing-docstring
728 #
729
730 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
731 # Construct the payload.
732 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
733 for sArg in asArgs:
734 aoPayload.append('%s' % (sArg));
735 aoPayload.append(long(len(asAddEnv)));
736 for sPutEnv in asAddEnv:
737 aoPayload.append('%s' % (sPutEnv));
738 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
739 if utils.isString(o):
740 aoPayload.append(o);
741 elif o is not None:
742 aoPayload.append('|');
743 o.uTxsClientCrc32 = zlib.crc32(b'');
744 else:
745 aoPayload.append('');
746 aoPayload.append('%s' % (sAsUser));
747 aoPayload.append(long(self.cMsTimeout));
748
749 # Kick of the EXEC command.
750 rc = self.sendMsg('EXEC', aoPayload)
751 if rc is True:
752 rc = self.recvAckLogged('EXEC');
753 if rc is True:
754 # Loop till the process completes, feed input to the TXS and
755 # receive output from it.
756 sFailure = "";
757 msPendingInputReply = None;
758 cbMsg, sOpcode, abPayload = (None, None, None);
759 while True:
760 # Pending input?
761 if msPendingInputReply is None \
762 and oStdIn is not None \
763 and not utils.isString(oStdIn):
764 try:
765 sInput = oStdIn.read(65536);
766 except:
767 reporter.errorXcpt('read standard in');
768 sFailure = 'exception reading stdin';
769 rc = None;
770 break;
771 if sInput:
772 # Convert to a byte array before handing it of to sendMsg or the string
773 # will get some zero termination added breaking the CRC (and injecting
774 # unwanted bytes).
775 abInput = array.array('B', sInput.encode('utf-8'));
776 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
777 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
778 if rc is not True:
779 sFailure = 'sendMsg failure';
780 break;
781 msPendingInputReply = base.timestampMilli();
782 continue;
783
784 rc = self.sendMsg('STDINEOS');
785 oStdIn = None;
786 if rc is not True:
787 sFailure = 'sendMsg failure';
788 break;
789 msPendingInputReply = base.timestampMilli();
790
791 # Wait for input (500 ms timeout).
792 if cbMsg is None:
793 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
794 if cbMsg is None:
795 # Check for time out before restarting the loop.
796 # Note! Only doing timeout checking here does mean that
797 # the TXS may prevent us from timing out by
798 # flooding us with data. This is unlikely though.
799 if self.hasTimedOut() \
800 and ( msPendingInputReply is None \
801 or base.timestampMilli() - msPendingInputReply > 30000):
802 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
803 sFailure = 'timeout';
804 rc = None;
805 break;
806 # Check that the connection is OK.
807 if not self.oTransport.isConnectionOk():
808 self.oTransport.disconnect();
809 sFailure = 'disconnected';
810 rc = False;
811 break;
812 continue;
813
814 # Handle the response.
815 sOpcode = sOpcode.rstrip();
816 if sOpcode == 'STDOUT':
817 oOut = oStdOut;
818 elif sOpcode == 'STDERR':
819 oOut = oStdErr;
820 elif sOpcode == 'TESTPIPE':
821 oOut = oTestPipe;
822 else:
823 oOut = None;
824 if oOut is not None:
825 # Output from the process.
826 if len(abPayload) < 4:
827 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
828 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
829 rc = None;
830 break;
831 uStreamCrc32 = getU32(abPayload, 0);
832 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
833 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
834 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
835 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
836 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
837 rc = None;
838 break;
839 try:
840 oOut.write(abPayload[4:]);
841 except:
842 sFailure = 'exception writing %s' % (sOpcode);
843 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
844 rc = None;
845 break;
846 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
847 # Standard input is ignored. Ignore this condition for now.
848 msPendingInputReply = None;
849 reporter.log('taskExecEx: Standard input is ignored... why?');
850 del oStdIn.uTxsClientCrc32;
851 oStdIn = '/dev/null';
852 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
853 and msPendingInputReply is not None:
854 # TXS STDIN error, abort.
855 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
856 msPendingInputReply = None;
857 sFailure = 'TXS is out of memory for std input buffering';
858 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
859 rc = None;
860 break;
861 elif sOpcode == 'ACK' and msPendingInputReply is not None:
862 msPendingInputReply = None;
863 elif sOpcode.startswith('PROC '):
864 # Process status message, handle it outside the loop.
865 rc = True;
866 break;
867 else:
868 sFailure = 'Unexpected opcode %s' % (sOpcode);
869 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
870 rc = None;
871 break;
872 # Clear the message.
873 cbMsg, sOpcode, abPayload = (None, None, None);
874
875 # If we sent an STDIN packet and didn't get a reply yet, we'll give
876 # TXS some 5 seconds to reply to this. If we don't wait here we'll
877 # get screwed later on if we mix it up with the reply to some other
878 # command. Hackish.
879 if msPendingInputReply is not None:
880 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
881 if cbMsg2 is not None:
882 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
883 % (cbMsg2, sOpcode2, abPayload2));
884 msPendingInputReply = None;
885 else:
886 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
887 self.fScrewedUpMsgState = True;
888
889 # Parse the exit status (True), abort (None) or do nothing (False).
890 if rc is True:
891 if sOpcode != 'PROC OK':
892 # Do proper parsing some other day if needed:
893 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
894 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
895 rc = False;
896 else:
897 if rc is None:
898 # Abort it.
899 reporter.log('taskExecEx: sending ABORT...');
900 rc = self.sendMsg('ABORT');
901 while rc is True:
902 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
903 if cbMsg is None:
904 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
905 self.fScrewedUpMsgState = True;
906 break;
907 if sOpcode.startswith('PROC '):
908 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
909 break;
910 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
911 # Check that the connection is OK before looping.
912 if not self.oTransport.isConnectionOk():
913 self.oTransport.disconnect();
914 break;
915
916 # Fake response with the reason why we quit.
917 if sFailure is not None:
918 self.t3oReply = (0, 'EXECFAIL', sFailure);
919 rc = None;
920 else:
921 rc = None;
922
923 # Cleanup.
924 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
925 if o is not None and not utils.isString(o):
926 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
927 # Make sure all files are closed
928 o.close(); # pylint: disable=maybe-no-member
929 reporter.log('taskExecEx: returns %s' % (rc));
930 return rc;
931
932 #
933 # Admin tasks
934 #
935
936 def hlpRebootShutdownWaitForAck(self, sCmd):
937 """Wait for reboot/shutodwn ACK."""
938 rc = self.recvAckLogged(sCmd);
939 if rc is True:
940 # poll a little while for server to disconnect.
941 uMsStart = base.timestampMilli();
942 while self.oTransport.isConnectionOk() \
943 and base.timestampMilli() - uMsStart >= 5000:
944 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
945 break;
946 self.oTransport.disconnect();
947 return rc;
948
949 def taskReboot(self):
950 rc = self.sendMsg('REBOOT');
951 if rc is True:
952 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
953 return rc;
954
955 def taskShutdown(self):
956 rc = self.sendMsg('SHUTDOWN');
957 if rc is True:
958 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
959 return rc;
960
961 #
962 # CD/DVD control tasks.
963 #
964
965 ## TODO
966
967 #
968 # File system tasks
969 #
970
971 def taskMkDir(self, sRemoteDir, fMode):
972 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
973 if rc is True:
974 rc = self.recvAckLogged('MKDIR');
975 return rc;
976
977 def taskMkDirPath(self, sRemoteDir, fMode):
978 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
979 if rc is True:
980 rc = self.recvAckLogged('MKDRPATH');
981 return rc;
982
983 def taskMkSymlink(self, sLinkTarget, sLink):
984 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
985 if rc is True:
986 rc = self.recvAckLogged('MKSYMLNK');
987 return rc;
988
989 def taskRmDir(self, sRemoteDir):
990 rc = self.sendMsg('RMDIR', (sRemoteDir,));
991 if rc is True:
992 rc = self.recvAckLogged('RMDIR');
993 return rc;
994
995 def taskRmFile(self, sRemoteFile):
996 rc = self.sendMsg('RMFILE', (sRemoteFile,));
997 if rc is True:
998 rc = self.recvAckLogged('RMFILE');
999 return rc;
1000
1001 def taskRmSymlink(self, sRemoteSymlink):
1002 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1003 if rc is True:
1004 rc = self.recvAckLogged('RMSYMLNK');
1005 return rc;
1006
1007 def taskRmTree(self, sRemoteTree):
1008 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1009 if rc is True:
1010 rc = self.recvAckLogged('RMTREE');
1011 return rc;
1012
1013 def taskChMod(self, sRemotePath, fMode):
1014 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1015 if rc is True:
1016 rc = self.recvAckLogged('CHMOD');
1017 return rc;
1018
1019 def taskChOwn(self, sRemotePath, idUser, idGroup):
1020 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1021 if rc is True:
1022 rc = self.recvAckLogged('CHOWN');
1023 return rc;
1024
1025 def taskIsDir(self, sRemoteDir):
1026 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1027 if rc is True:
1028 rc = self.recvTrueFalse('ISDIR');
1029 return rc;
1030
1031 def taskIsFile(self, sRemoteFile):
1032 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1033 if rc is True:
1034 rc = self.recvTrueFalse('ISFILE');
1035 return rc;
1036
1037 def taskIsSymlink(self, sRemoteSymlink):
1038 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1039 if rc is True:
1040 rc = self.recvTrueFalse('ISSYMLNK');
1041 return rc;
1042
1043 #def "STAT "
1044 #def "LSTAT "
1045 #def "LIST "
1046
1047 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1048 #
1049 # Open the local file (make sure it exist before bothering TXS) and
1050 # tell TXS that we want to upload a file.
1051 #
1052 try:
1053 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1054 except:
1055 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1056 return False;
1057
1058 # Common cause with taskUploadStr
1059 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1060
1061 # Cleanup.
1062 oLocalFile.close();
1063 return rc;
1064
1065 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1066 # Wrap sContent in a file like class.
1067 class InStringFile(object): # pylint: disable=too-few-public-methods
1068 def __init__(self, sContent):
1069 self.sContent = sContent;
1070 self.off = 0;
1071
1072 def read(self, cbMax):
1073 cbLeft = len(self.sContent) - self.off;
1074 if cbLeft == 0:
1075 return "";
1076 if cbLeft <= cbMax:
1077 sRet = self.sContent[self.off:(self.off + cbLeft)];
1078 else:
1079 sRet = self.sContent[self.off:(self.off + cbMax)];
1080 self.off = self.off + len(sRet);
1081 return sRet;
1082
1083 oLocalString = InStringFile(sContent);
1084 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1085
1086 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1087 """Common worker used by taskUploadFile and taskUploadString."""
1088 #
1089 # Command + ACK.
1090 #
1091 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1092 # Fall back on the old command if the new one is not known by the TXS.
1093 #
1094 if fMode == 0:
1095 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1096 if rc is True:
1097 rc = self.recvAckLogged('PUT FILE');
1098 else:
1099 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1100 if rc is True:
1101 rc = self.recvAck();
1102 if rc is False:
1103 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1104 elif rc is not True:
1105 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1106 # Fallback:
1107 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1108 if rc is True:
1109 rc = self.recvAckLogged('PUT FILE');
1110 else:
1111 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1112 rc = False;
1113 if rc is True:
1114 #
1115 # Push data packets until eof.
1116 #
1117 uMyCrc32 = zlib.crc32(b'');
1118 while True:
1119 # Read up to 64 KB of data.
1120 try:
1121 sRaw = oLocalFile.read(65536);
1122 except:
1123 rc = None;
1124 break;
1125
1126 # Convert to array - this is silly!
1127 abBuf = array.array('B');
1128 if utils.isString(sRaw):
1129 for i, _ in enumerate(sRaw):
1130 abBuf.append(ord(sRaw[i]));
1131 else:
1132 abBuf.extend(sRaw);
1133 sRaw = None;
1134
1135 # Update the file stream CRC and send it off.
1136 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1137 if not abBuf:
1138 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1139 else:
1140 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1141 if rc is False:
1142 break;
1143
1144 # Wait for the reply.
1145 rc = self.recvAck();
1146 if rc is not True:
1147 if rc is False:
1148 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1149 else:
1150 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1151 rc = False;
1152 break;
1153
1154 # EOF?
1155 if not abBuf:
1156 break;
1157
1158 # Send ABORT on ACK and I/O errors.
1159 if rc is None:
1160 rc = self.sendMsg('ABORT');
1161 if rc is True:
1162 self.recvAckLogged('ABORT');
1163 rc = False;
1164 return rc;
1165
1166 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1167 try:
1168 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1169 except:
1170 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1171 return False;
1172
1173 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1174
1175 oLocalFile.close();
1176 if rc is False:
1177 try:
1178 os.remove(sLocalFile);
1179 except:
1180 reporter.errorXcpt();
1181 return rc;
1182
1183 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1184 # Wrap sContent in a file like class.
1185 class OutStringFile(object): # pylint: disable=too-few-public-methods
1186 def __init__(self):
1187 self.asContent = [];
1188
1189 def write(self, sBuf):
1190 self.asContent.append(sBuf);
1191 return None;
1192
1193 oLocalString = OutStringFile();
1194 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1195 if rc is True:
1196 rc = '';
1197 for sBuf in oLocalString.asContent:
1198 if hasattr(sBuf, 'decode'):
1199 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1200 else:
1201 rc += sBuf;
1202 return rc;
1203
1204 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1205 """Common worker for taskDownloadFile and taskDownloadString."""
1206 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1207 if rc is True:
1208 #
1209 # Process data packets until eof.
1210 #
1211 uMyCrc32 = zlib.crc32(b'');
1212 while rc is True:
1213 cbMsg, sOpcode, abPayload = self.recvReply();
1214 if cbMsg is None:
1215 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1216 rc = None;
1217 break;
1218
1219 # Validate.
1220 sOpcode = sOpcode.rstrip();
1221 if sOpcode not in ('DATA', 'DATA EOF',):
1222 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1223 % (sOpcode, getSZ(abPayload, 0, "None")));
1224 rc = False;
1225 break;
1226 if sOpcode == 'DATA' and len(abPayload) < 4:
1227 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1228 rc = None;
1229 break;
1230 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1231 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1232 rc = None;
1233 break;
1234
1235 # Check the CRC (common for both packets).
1236 uCrc32 = getU32(abPayload, 0);
1237 if sOpcode == 'DATA':
1238 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1239 if uCrc32 != (uMyCrc32 & 0xffffffff):
1240 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1241 % (hex(uMyCrc32), hex(uCrc32)));
1242 rc = None;
1243 break;
1244 if sOpcode == 'DATA EOF':
1245 rc = self.sendMsg('ACK');
1246 break;
1247
1248 # Finally, push the data to the file.
1249 try:
1250 oLocalFile.write(abPayload[4:].tostring());
1251 except:
1252 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1253 rc = None;
1254 break;
1255 rc = self.sendMsg('ACK');
1256
1257 # Send NACK on validation and I/O errors.
1258 if rc is None:
1259 rc = self.sendMsg('NACK');
1260 rc = False;
1261 return rc;
1262
1263 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1264 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1265 if rc is True:
1266 rc = self.recvAckLogged('UNPKFILE');
1267 return rc;
1268
1269 # pylint: enable=missing-docstring
1270
1271
1272 #
1273 # Public methods - generic task queries
1274 #
1275
1276 def isSuccess(self):
1277 """Returns True if the task completed successfully, otherwise False."""
1278 self.lockTask();
1279 sStatus = self.sStatus;
1280 oTaskRc = self.oTaskRc;
1281 self.unlockTask();
1282 if sStatus != "":
1283 return False;
1284 if oTaskRc is False or oTaskRc is None:
1285 return False;
1286 return True;
1287
1288 def getResult(self):
1289 """
1290 Returns the result of a completed task.
1291 Returns None if not completed yet or no previous task.
1292 """
1293 self.lockTask();
1294 sStatus = self.sStatus;
1295 oTaskRc = self.oTaskRc;
1296 self.unlockTask();
1297 if sStatus != "":
1298 return None;
1299 return oTaskRc;
1300
1301 def getLastReply(self):
1302 """
1303 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1304 Returns a None, None, None three-tuple if there was no last reply.
1305 """
1306 self.lockTask();
1307 t3oReply = self.t3oReply;
1308 self.unlockTask();
1309 return t3oReply;
1310
1311 #
1312 # Public methods - connection.
1313 #
1314
1315 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1316 """
1317 Initiates a disconnect task.
1318
1319 Returns True on success, False on failure (logged).
1320
1321 The task returns True on success and False on failure.
1322 """
1323 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1324
1325 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1326 """Synchronous version."""
1327 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1328
1329 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1330 """
1331 Initiates a task for getting the TXS UUID.
1332
1333 Returns True on success, False on failure (logged).
1334
1335 The task returns UUID string (in {}) on success and False on failure.
1336 """
1337 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1338
1339 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1340 """Synchronous version."""
1341 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1342
1343 #
1344 # Public methods - execution.
1345 #
1346
1347 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1348 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1349 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1350 """
1351 Initiates a exec process task.
1352
1353 Returns True on success, False on failure (logged).
1354
1355 The task returns True if the process exited normally with status code 0.
1356 The task returns None if on failure prior to executing the process, and
1357 False if the process exited with a different status or in an abnormal
1358 manner. Both None and False are logged of course and further info can
1359 also be obtained by getLastReply().
1360
1361 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1362 these streams. If None, no special action is taken and the output goes
1363 to where ever the TXS sends its output, and ditto for input.
1364 - To send to / read from the bitbucket, pass '/dev/null'.
1365 - To redirect to/from a file, just specify the remote filename.
1366 - To append to a file use '>>' followed by the remote filename.
1367 - To pipe the stream to/from the TXS, specify a file like
1368 object. For StdIn a non-blocking read() method is required. For
1369 the other a write() method is required. Watch out for deadlock
1370 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1371 """
1372 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1373 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1374 oStdOut, oStdErr, oTestPipe, sAsUser));
1375
1376 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1377 oStdIn = '/dev/null', oStdOut = '/dev/null',
1378 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1379 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1380 """Synchronous version."""
1381 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1382 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1383
1384 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1385 cMsTimeout = 3600000, fIgnoreErrors = False):
1386 """
1387 Initiates a exec process test task.
1388
1389 Returns True on success, False on failure (logged).
1390
1391 The task returns True if the process exited normally with status code 0.
1392 The task returns None if on failure prior to executing the process, and
1393 False if the process exited with a different status or in an abnormal
1394 manner. Both None and False are logged of course and further info can
1395 also be obtained by getLastReply().
1396
1397 Standard in is taken from /dev/null. While both standard output and
1398 standard error goes directly to reporter.log(). The testpipe is piped
1399 to reporter.xxxx.
1400 """
1401
1402 sStdIn = '/dev/null';
1403 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1404 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1405 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1406 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1407
1408 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1409 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1410
1411 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1412 cMsTimeout = 3600000, fIgnoreErrors = False):
1413 """Synchronous version."""
1414 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1415 cMsTimeout, fIgnoreErrors);
1416
1417 #
1418 # Public methods - system
1419 #
1420
1421 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1422 """
1423 Initiates a reboot task.
1424
1425 Returns True on success, False on failure (logged).
1426
1427 The task returns True on success, False on failure (logged). The
1428 session will be disconnected on successful task completion.
1429 """
1430 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1431
1432 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1433 """Synchronous version."""
1434 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1435
1436 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1437 """
1438 Initiates a shutdown task.
1439
1440 Returns True on success, False on failure (logged).
1441
1442 The task returns True on success, False on failure (logged).
1443 """
1444 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1445
1446 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1447 """Synchronous version."""
1448 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1449
1450
1451 #
1452 # Public methods - file system
1453 #
1454
1455 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1456 """
1457 Initiates a mkdir task.
1458
1459 Returns True on success, False on failure (logged).
1460
1461 The task returns True on success, False on failure (logged).
1462 """
1463 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1464
1465 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1466 """Synchronous version."""
1467 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1468
1469 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1470 """
1471 Initiates a mkdir -p task.
1472
1473 Returns True on success, False on failure (logged).
1474
1475 The task returns True on success, False on failure (logged).
1476 """
1477 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1478
1479 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1480 """Synchronous version."""
1481 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1482
1483 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1484 """
1485 Initiates a symlink task.
1486
1487 Returns True on success, False on failure (logged).
1488
1489 The task returns True on success, False on failure (logged).
1490 """
1491 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1492
1493 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1494 """Synchronous version."""
1495 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1496
1497 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1498 """
1499 Initiates a rmdir task.
1500
1501 Returns True on success, False on failure (logged).
1502
1503 The task returns True on success, False on failure (logged).
1504 """
1505 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1506
1507 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1508 """Synchronous version."""
1509 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1510
1511 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1512 """
1513 Initiates a rmfile task.
1514
1515 Returns True on success, False on failure (logged).
1516
1517 The task returns True on success, False on failure (logged).
1518 """
1519 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1520
1521 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """Synchronous version."""
1523 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1524
1525 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1526 """
1527 Initiates a rmsymlink task.
1528
1529 Returns True on success, False on failure (logged).
1530
1531 The task returns True on success, False on failure (logged).
1532 """
1533 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1534
1535 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1536 """Synchronous version."""
1537 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1538
1539 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1540 """
1541 Initiates a rmtree task.
1542
1543 Returns True on success, False on failure (logged).
1544
1545 The task returns True on success, False on failure (logged).
1546 """
1547 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1548
1549 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1550 """Synchronous version."""
1551 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1552
1553 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1554 """
1555 Initiates a chmod task.
1556
1557 Returns True on success, False on failure (logged).
1558
1559 The task returns True on success, False on failure (logged).
1560 """
1561 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1562
1563 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """Synchronous version."""
1565 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1566
1567 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1568 """
1569 Initiates a chown task.
1570
1571 Returns True on success, False on failure (logged).
1572
1573 The task returns True on success, False on failure (logged).
1574 """
1575 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1576
1577 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1578 """Synchronous version."""
1579 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1580
1581 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1582 """
1583 Initiates a is-dir query task.
1584
1585 Returns True on success, False on failure (logged).
1586
1587 The task returns True if it's a directory, False if it isn't, and
1588 None on error (logged).
1589 """
1590 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1591
1592 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1593 """Synchronous version."""
1594 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1595
1596 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1597 """
1598 Initiates a is-file query task.
1599
1600 Returns True on success, False on failure (logged).
1601
1602 The task returns True if it's a file, False if it isn't, and None on
1603 error (logged).
1604 """
1605 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1606
1607 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1608 """Synchronous version."""
1609 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1610
1611 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """
1613 Initiates a is-symbolic-link query task.
1614
1615 Returns True on success, False on failure (logged).
1616
1617 The task returns True if it's a symbolic linke, False if it isn't, and
1618 None on error (logged).
1619 """
1620 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1621
1622 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1623 """Synchronous version."""
1624 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1625
1626 #def "STAT "
1627 #def "LSTAT "
1628 #def "LIST "
1629
1630 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1631 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1632 """
1633 Initiates a download query task.
1634
1635 Returns True on success, False on failure (logged).
1636
1637 The task returns True on success, False on failure (logged).
1638 """
1639 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1640 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1641
1642 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1643 """Synchronous version."""
1644 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1645
1646 def asyncUploadString(self, sContent, sRemoteFile,
1647 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1648 """
1649 Initiates a upload string task.
1650
1651 Returns True on success, False on failure (logged).
1652
1653 The task returns True on success, False on failure (logged).
1654 """
1655 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1656 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1657
1658 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1659 """Synchronous version."""
1660 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1661
1662 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1663 """
1664 Initiates a download file task.
1665
1666 Returns True on success, False on failure (logged).
1667
1668 The task returns True on success, False on failure (logged).
1669 """
1670 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1671
1672 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1673 """Synchronous version."""
1674 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1675
1676 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1677 cMsTimeout = 30000, fIgnoreErrors = False):
1678 """
1679 Initiates a download string task.
1680
1681 Returns True on success, False on failure (logged).
1682
1683 The task returns a byte string on success, False on failure (logged).
1684 """
1685 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1686 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1687
1688 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1689 cMsTimeout = 30000, fIgnoreErrors = False):
1690 """Synchronous version."""
1691 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1692 cMsTimeout, fIgnoreErrors);
1693
1694 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1695 """
1696 Initiates a unpack file task.
1697
1698 Returns True on success, False on failure (logged).
1699
1700 The task returns True on success, False on failure (logged).
1701 """
1702 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1703 (sRemoteFile, sRemoteDir));
1704
1705 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1706 """Synchronous version."""
1707 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1708
1709
1710class TransportTcp(TransportBase):
1711 """
1712 TCP transport layer for the TXS client session class.
1713 """
1714
1715 def __init__(self, sHostname, uPort, fReversedSetup):
1716 """
1717 Save the parameters. The session will call us back to make the
1718 connection later on its worker thread.
1719 """
1720 TransportBase.__init__(self, utils.getCallerName());
1721 self.sHostname = sHostname;
1722 self.fReversedSetup = fReversedSetup;
1723 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1724 self.oSocket = None;
1725 self.oWakeupW = None;
1726 self.oWakeupR = None;
1727 self.fConnectCanceled = False;
1728 self.fIsConnecting = False;
1729 self.oCv = threading.Condition();
1730 self.abReadAhead = array.array('B');
1731
1732 def toString(self):
1733 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1734 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1735 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1736 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1737
1738 def __isInProgressXcpt(self, oXcpt):
1739 """ In progress exception? """
1740 try:
1741 if isinstance(oXcpt, socket.error):
1742 try:
1743 if oXcpt.errno == errno.EINPROGRESS:
1744 return True;
1745 except: pass;
1746 # Windows?
1747 try:
1748 if oXcpt.errno == errno.EWOULDBLOCK:
1749 return True;
1750 except: pass;
1751 except:
1752 pass;
1753 return False;
1754
1755 def __isWouldBlockXcpt(self, oXcpt):
1756 """ Would block exception? """
1757 try:
1758 if isinstance(oXcpt, socket.error):
1759 try:
1760 if oXcpt.errno == errno.EWOULDBLOCK:
1761 return True;
1762 except: pass;
1763 try:
1764 if oXcpt.errno == errno.EAGAIN:
1765 return True;
1766 except: pass;
1767 except:
1768 pass;
1769 return False;
1770
1771 def __isConnectionReset(self, oXcpt):
1772 """ Connection reset by Peer or others. """
1773 try:
1774 if isinstance(oXcpt, socket.error):
1775 try:
1776 if oXcpt.errno == errno.ECONNRESET:
1777 return True;
1778 except: pass;
1779 try:
1780 if oXcpt.errno == errno.ENETRESET:
1781 return True;
1782 except: pass;
1783 except:
1784 pass;
1785 return False;
1786
1787 def _closeWakeupSockets(self):
1788 """ Closes the wakup sockets. Caller should own the CV. """
1789 oWakeupR = self.oWakeupR;
1790 self.oWakeupR = None;
1791 if oWakeupR is not None:
1792 oWakeupR.close();
1793
1794 oWakeupW = self.oWakeupW;
1795 self.oWakeupW = None;
1796 if oWakeupW is not None:
1797 oWakeupW.close();
1798
1799 return None;
1800
1801 def cancelConnect(self):
1802 # This is bad stuff.
1803 self.oCv.acquire();
1804 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1805 self.fConnectCanceled = True;
1806 if self.fIsConnecting:
1807 oSocket = self.oSocket;
1808 self.oSocket = None;
1809 if oSocket is not None:
1810 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1811 oSocket.close();
1812
1813 oWakeupW = self.oWakeupW;
1814 self.oWakeupW = None;
1815 if oWakeupW is not None:
1816 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1817 try: oWakeupW.send('cancelled!\n');
1818 except: reporter.logXcpt();
1819 try: oWakeupW.shutdown(socket.SHUT_WR);
1820 except: reporter.logXcpt();
1821 oWakeupW.close();
1822 self.oCv.release();
1823
1824 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1825 """ Connects to the TXS server as server, i.e. the reversed setup. """
1826 assert(self.fReversedSetup);
1827
1828 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1829
1830 # Workaround for bind() failure...
1831 try:
1832 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1833 except:
1834 reporter.errorXcpt('socket.listen(1) failed');
1835 return None;
1836
1837 # Bind the socket and make it listen.
1838 try:
1839 oSocket.bind((self.sHostname, self.uPort));
1840 except:
1841 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1842 return None;
1843 try:
1844 oSocket.listen(1);
1845 except:
1846 reporter.errorXcpt('socket.listen(1) failed');
1847 return None;
1848
1849 # Accept connections.
1850 oClientSocket = None;
1851 tClientAddr = None;
1852 try:
1853 (oClientSocket, tClientAddr) = oSocket.accept();
1854 except socket.error as e:
1855 if not self.__isInProgressXcpt(e):
1856 raise;
1857
1858 # Do the actual waiting.
1859 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1860 try:
1861 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1862 except socket.error as e:
1863 if e[0] != errno.EBADF or not self.fConnectCanceled:
1864 raise;
1865 reporter.log('socket.select() on accept was canceled');
1866 return None;
1867 except:
1868 reporter.logXcpt('socket.select() on accept');
1869
1870 # Try accept again.
1871 try:
1872 (oClientSocket, tClientAddr) = oSocket.accept();
1873 except socket.error as e:
1874 if not self.__isInProgressXcpt(e):
1875 if e[0] != errno.EBADF or not self.fConnectCanceled:
1876 raise;
1877 reporter.log('socket.accept() was canceled');
1878 return None;
1879 reporter.log('socket.accept() timed out');
1880 return False;
1881 except:
1882 reporter.errorXcpt('socket.accept() failed');
1883 return None;
1884 except:
1885 reporter.errorXcpt('socket.accept() failed');
1886 return None;
1887
1888 # Store the connected socket and throw away the server socket.
1889 self.oCv.acquire();
1890 if not self.fConnectCanceled:
1891 self.oSocket.close();
1892 self.oSocket = oClientSocket;
1893 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1894 self.oCv.release();
1895 return True;
1896
1897 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1898 """ Connects to the TXS server as client. """
1899 assert(not self.fReversedSetup);
1900
1901 # Connect w/ timeouts.
1902 rc = None;
1903 try:
1904 oSocket.connect((self.sHostname, self.uPort));
1905 rc = True;
1906 except socket.error as oXcpt:
1907 iRc = oXcpt.errno;
1908 if self.__isInProgressXcpt(oXcpt):
1909 # Do the actual waiting.
1910 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1911 try:
1912 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1913 if len(ttRc[1]) + len(ttRc[2]) == 0:
1914 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1915 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1916 rc = iRc == 0;
1917 except socket.error as oXcpt2:
1918 iRc = oXcpt2.errno;
1919 except:
1920 iRc = -42;
1921 reporter.fatalXcpt('socket.select() on connect failed');
1922
1923 if rc is True:
1924 pass;
1925 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
1926 rc = False; # try again.
1927 else:
1928 if iRc != errno.EBADF or not self.fConnectCanceled:
1929 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1930 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1931 except:
1932 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1933 return rc;
1934
1935
1936 def connect(self, cMsTimeout):
1937 # Create a non-blocking socket.
1938 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1939 try:
1940 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1941 except:
1942 reporter.fatalXcpt('socket.socket() failed');
1943 return None;
1944 try:
1945 oSocket.setblocking(0);
1946 except:
1947 oSocket.close();
1948 reporter.fatalXcpt('socket.socket() failed');
1949 return None;
1950
1951 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1952 oWakeupR = None;
1953 oWakeupW = None;
1954 if hasattr(socket, 'socketpair'):
1955 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
1956 except: reporter.logXcpt('socket.socketpair() failed');
1957
1958 # Update the state.
1959 self.oCv.acquire();
1960 rc = None;
1961 if not self.fConnectCanceled:
1962 self.oSocket = oSocket;
1963 self.oWakeupW = oWakeupW;
1964 self.oWakeupR = oWakeupR;
1965 self.fIsConnecting = True;
1966 self.oCv.release();
1967
1968 # Try connect.
1969 if oWakeupR is None:
1970 oWakeupR = oSocket; # Avoid select failure.
1971 if self.fReversedSetup:
1972 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1973 else:
1974 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1975 oSocket = None;
1976
1977 # Update the state and cleanup on failure/cancel.
1978 self.oCv.acquire();
1979 if rc is True and self.fConnectCanceled:
1980 rc = False;
1981 self.fIsConnecting = False;
1982
1983 if rc is not True:
1984 if self.oSocket is not None:
1985 self.oSocket.close();
1986 self.oSocket = None;
1987 self._closeWakeupSockets();
1988 self.oCv.release();
1989
1990 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1991 return rc;
1992
1993 def disconnect(self, fQuiet = False):
1994 if self.oSocket is not None:
1995 self.abReadAhead = array.array('B');
1996
1997 # Try a shutting down the socket gracefully (draining it).
1998 try:
1999 self.oSocket.shutdown(socket.SHUT_WR);
2000 except:
2001 if not fQuiet:
2002 reporter.error('shutdown(SHUT_WR)');
2003 try:
2004 self.oSocket.setblocking(0); # just in case it's not set.
2005 sData = "1";
2006 while sData:
2007 sData = self.oSocket.recv(16384);
2008 except:
2009 pass;
2010
2011 # Close it.
2012 self.oCv.acquire();
2013 try: self.oSocket.setblocking(1);
2014 except: pass;
2015 self.oSocket.close();
2016 self.oSocket = None;
2017 else:
2018 self.oCv.acquire();
2019 self._closeWakeupSockets();
2020 self.oCv.release();
2021
2022 def sendBytes(self, abBuf, cMsTimeout):
2023 if self.oSocket is None:
2024 reporter.error('TransportTcp.sendBytes: No connection.');
2025 return False;
2026
2027 # Try send it all.
2028 try:
2029 cbSent = self.oSocket.send(abBuf);
2030 if cbSent == len(abBuf):
2031 return True;
2032 except Exception as oXcpt:
2033 if not self.__isWouldBlockXcpt(oXcpt):
2034 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2035 return False;
2036 cbSent = 0;
2037
2038 # Do a timed send.
2039 msStart = base.timestampMilli();
2040 while True:
2041 cMsElapsed = base.timestampMilli() - msStart;
2042 if cMsElapsed > cMsTimeout:
2043 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2044 break;
2045
2046 # wait.
2047 try:
2048 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2049 if ttRc[2] and not ttRc[1]:
2050 reporter.error('TranportTcp.sendBytes: select returned with exception');
2051 break;
2052 if not ttRc[1]:
2053 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2054 break;
2055 except:
2056 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2057 break;
2058
2059 # Try send more.
2060 try:
2061 cbSent += self.oSocket.send(abBuf[cbSent:]);
2062 if cbSent == len(abBuf):
2063 return True;
2064 except Exception as oXcpt:
2065 if not self.__isWouldBlockXcpt(oXcpt):
2066 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2067 break;
2068
2069 return False;
2070
2071 def __returnReadAheadBytes(self, cb):
2072 """ Internal worker for recvBytes. """
2073 assert(len(self.abReadAhead) >= cb);
2074 abRet = self.abReadAhead[:cb];
2075 self.abReadAhead = self.abReadAhead[cb:];
2076 return abRet;
2077
2078 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2079 if self.oSocket is None:
2080 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2081 return None;
2082
2083 # Try read in some more data without bothering with timeout handling first.
2084 if len(self.abReadAhead) < cb:
2085 try:
2086 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2087 if abBuf:
2088 self.abReadAhead.extend(array.array('B', abBuf));
2089 except Exception as oXcpt:
2090 if not self.__isWouldBlockXcpt(oXcpt):
2091 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2092 return None;
2093
2094 if len(self.abReadAhead) >= cb:
2095 return self.__returnReadAheadBytes(cb);
2096
2097 # Timeout loop.
2098 msStart = base.timestampMilli();
2099 while True:
2100 cMsElapsed = base.timestampMilli() - msStart;
2101 if cMsElapsed > cMsTimeout:
2102 if not fNoDataOk or self.abReadAhead:
2103 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2104 break;
2105
2106 # Wait.
2107 try:
2108 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2109 if ttRc[2] and not ttRc[0]:
2110 reporter.error('TranportTcp.recvBytes: select returned with exception');
2111 break;
2112 if not ttRc[0]:
2113 if not fNoDataOk or self.abReadAhead:
2114 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2115 % (len(self.abReadAhead), cb, fNoDataOk));
2116 break;
2117 except:
2118 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2119 break;
2120
2121 # Try read more.
2122 try:
2123 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2124 if not abBuf:
2125 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2126 % (len(self.abReadAhead), cb, fNoDataOk));
2127 self.disconnect();
2128 return None;
2129
2130 self.abReadAhead.extend(array.array('B', abBuf));
2131
2132 except Exception as oXcpt:
2133 reporter.log('recv => exception %s' % (oXcpt,));
2134 if not self.__isWouldBlockXcpt(oXcpt):
2135 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2136 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2137 break;
2138
2139 # Done?
2140 if len(self.abReadAhead) >= cb:
2141 return self.__returnReadAheadBytes(cb);
2142
2143 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2144 return None;
2145
2146 def isConnectionOk(self):
2147 if self.oSocket is None:
2148 return False;
2149 try:
2150 ttRc = select.select([], [], [self.oSocket], 0.0);
2151 if ttRc[2]:
2152 return False;
2153
2154 self.oSocket.send(array.array('B')); # send zero bytes.
2155 except:
2156 return False;
2157 return True;
2158
2159 def isRecvPending(self, cMsTimeout = 0):
2160 try:
2161 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2162 if not ttRc[0]:
2163 return False;
2164 except:
2165 pass;
2166 return True;
2167
2168
2169def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2170 """
2171 Opens a connection to a Test Execution Service via TCP, given its name.
2172
2173 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2174 or similar.
2175 """
2176 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2177 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2178 try:
2179 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2180 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2181 except:
2182 reporter.errorXcpt(None, 15);
2183 return None;
2184 return oSession;
2185
2186
2187def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2188 """
2189 Tries to open a connection to a Test Execution Service via TCP, given its name.
2190
2191 This differs from openTcpSession in that it won't log a connection failure
2192 as an error.
2193 """
2194 try:
2195 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2196 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2197 except:
2198 reporter.errorXcpt(None, 15);
2199 return None;
2200 return oSession;
2201
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette