VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 79450

Last change on this file since 79450 was 79447, checked in by vboxsync, 5 years ago

ValKit/txsclient.py: Added chmod, chown and optional mode mask to the upload methods to better handle unix guests. bugref:9151

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 83.3 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 79447 2019-07-01 15:48:27Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2019 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 79447 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B', \
124 ( u32 % 256, \
125 (u32 // 256) % 256, \
126 (u32 // 65536) % 256, \
127 (u32 // 16777216) % 256) );
128
129
130
131class TransportBase(object):
132 """
133 Base class for the transport layer.
134 """
135
136 def __init__(self, sCaller):
137 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
138 self.fDummy = 0;
139 self.abReadAheadHdr = array.array('B');
140
141 def toString(self):
142 """
143 Stringify the instance for logging and debugging.
144 """
145 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
146
147 def __str__(self):
148 return self.toString();
149
150 def cancelConnect(self):
151 """
152 Cancels any pending connect() call.
153 Returns None;
154 """
155 return None;
156
157 def connect(self, cMsTimeout):
158 """
159 Quietly attempts to connect to the TXS.
160
161 Returns True on success.
162 Returns False on retryable errors (no logging).
163 Returns None on fatal errors with details in the log.
164
165 Override this method, don't call super.
166 """
167 _ = cMsTimeout;
168 return False;
169
170 def disconnect(self, fQuiet = False):
171 """
172 Disconnect from the TXS.
173
174 Returns True.
175
176 Override this method, don't call super.
177 """
178 _ = fQuiet;
179 return True;
180
181 def sendBytes(self, abBuf, cMsTimeout):
182 """
183 Sends the bytes in the buffer abBuf to the TXS.
184
185 Returns True on success.
186 Returns False on failure and error details in the log.
187
188 Override this method, don't call super.
189
190 Remarks: len(abBuf) is always a multiple of 16.
191 """
192 _ = abBuf; _ = cMsTimeout;
193 return False;
194
195 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
196 """
197 Receive cb number of bytes from the TXS.
198
199 Returns the bytes (array('B')) on success.
200 Returns None on failure and error details in the log.
201
202 Override this method, don't call super.
203
204 Remarks: cb is always a multiple of 16.
205 """
206 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
207 return None;
208
209 def isConnectionOk(self):
210 """
211 Checks if the connection is OK.
212
213 Returns True if it is.
214 Returns False if it isn't (caller should call diconnect).
215
216 Override this method, don't call super.
217 """
218 return True;
219
220 def isRecvPending(self, cMsTimeout = 0):
221 """
222 Checks if there is incoming bytes, optionally waiting cMsTimeout
223 milliseconds for something to arrive.
224
225 Returns True if there is, False if there isn't.
226
227 Override this method, don't call super.
228 """
229 _ = cMsTimeout;
230 return False;
231
232 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
233 """
234 Sends a message (opcode + encoded payload).
235
236 Returns True on success.
237 Returns False on failure and error details in the log.
238 """
239 # Fix + check the opcode.
240 if len(sOpcode) < 2:
241 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
242 return False;
243 sOpcode = sOpcode.ljust(8);
244 if not isValidOpcodeEncoding(sOpcode):
245 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
246 return False;
247
248 # Start construct the message.
249 cbMsg = 16 + len(abPayload);
250 abMsg = array.array('B');
251 abMsg.extend(u32ToByteArray(cbMsg));
252 abMsg.extend((0, 0, 0, 0)); # uCrc32
253 try:
254 abMsg.extend(array.array('B', \
255 ( ord(sOpcode[0]), \
256 ord(sOpcode[1]), \
257 ord(sOpcode[2]), \
258 ord(sOpcode[3]), \
259 ord(sOpcode[4]), \
260 ord(sOpcode[5]), \
261 ord(sOpcode[6]), \
262 ord(sOpcode[7]) ) ) );
263 if abPayload:
264 abMsg.extend(abPayload);
265 except:
266 reporter.fatalXcpt('sendMsgInt: packing problem...');
267 return False;
268
269 # checksum it, padd it and send it off.
270 uCrc32 = zlib.crc32(abMsg[8:]);
271 abMsg[4:8] = u32ToByteArray(uCrc32);
272
273 while len(abMsg) % 16:
274 abMsg.append(0);
275
276 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
277 return self.sendBytes(abMsg, cMsTimeout);
278
279 def recvMsg(self, cMsTimeout, fNoDataOk = False):
280 """
281 Receives a message from the TXS.
282
283 Returns the message three-tuple: length, opcode, payload.
284 Returns (None, None, None) on failure and error details in the log.
285 """
286
287 # Read the header.
288 if self.abReadAheadHdr:
289 assert(len(self.abReadAheadHdr) == 16);
290 abHdr = self.abReadAheadHdr;
291 self.abReadAheadHdr = array.array('B');
292 else:
293 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
294 if abHdr is None:
295 return (None, None, None);
296 if len(abHdr) != 16:
297 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
298 return (None, None, None);
299
300 # Unpack and validate the header.
301 cbMsg = getU32(abHdr, 0);
302 uCrc32 = getU32(abHdr, 4);
303 sOpcode = abHdr[8:16].tostring().decode('ascii');
304
305 if cbMsg < 16:
306 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
307 return (None, None, None);
308 if cbMsg > 1024*1024:
309 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
310 return (None, None, None);
311 if not isValidOpcodeEncoding(sOpcode):
312 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
313 return (None, None, None);
314
315 # Get the payload (if any), dropping the padding.
316 abPayload = array.array('B');
317 if cbMsg > 16:
318 if cbMsg % 16:
319 cbPadding = 16 - (cbMsg % 16);
320 else:
321 cbPadding = 0;
322 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
323 if abPayload is None:
324 self.abReadAheadHdr = abHdr;
325 if not fNoDataOk :
326 reporter.log('recvMsg: failed to recv payload bytes!');
327 return (None, None, None);
328
329 while cbPadding > 0:
330 abPayload.pop();
331 cbPadding = cbPadding - 1;
332
333 # Check the CRC-32.
334 if uCrc32 != 0:
335 uActualCrc32 = zlib.crc32(abHdr[8:]);
336 if cbMsg > 16:
337 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
338 uActualCrc32 = uActualCrc32 & 0xffffffff;
339 if uCrc32 != uActualCrc32:
340 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
341 return (None, None, None);
342
343 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
344 return (cbMsg, sOpcode, abPayload);
345
346 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
347 """
348 Sends a message (opcode + payload tuple).
349
350 Returns True on success.
351 Returns False on failure and error details in the log.
352 Returns None if you pass the incorrectly typed parameters.
353 """
354 # Encode the payload.
355 abPayload = array.array('B');
356 for o in aoPayload:
357 try:
358 if utils.isString(o):
359 if sys.version_info[0] >= 3:
360 abPayload.extend(o.encode('utf_8'));
361 else:
362 # the primitive approach...
363 sUtf8 = o.encode('utf_8');
364 for ch in sUtf8:
365 abPayload.append(ord(ch))
366 abPayload.append(0);
367 elif isinstance(o, (long, int)):
368 if o < 0 or o > 0xffffffff:
369 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
370 return None;
371 abPayload.extend(u32ToByteArray(o));
372 elif isinstance(o, array.array):
373 abPayload.extend(o);
374 else:
375 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
376 return None;
377 except:
378 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
379 return None;
380 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
381
382
383class Session(TdTaskBase):
384 """
385 A Test eXecution Service (TXS) client session.
386 """
387
388 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
389 """
390 Construct a TXS session.
391
392 This starts by connecting to the TXS and will enter the signalled state
393 when connected or the timeout has been reached.
394 """
395 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
396 self.oTransport = oTransport;
397 self.sStatus = "";
398 self.cMsTimeout = 0;
399 self.fErr = True; # Whether to report errors as error.
400 self.msStart = 0;
401 self.oThread = None;
402 self.fnTask = self.taskDummy;
403 self.aTaskArgs = None;
404 self.oTaskRc = None;
405 self.t3oReply = (None, None, None);
406 self.fScrewedUpMsgState = False;
407 self.fTryConnect = fTryConnect;
408
409 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
410 raise base.GenError("startTask failed");
411
412 def __del__(self):
413 """Make sure to cancel the task when deleted."""
414 self.cancelTask();
415
416 def toString(self):
417 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
418 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
419 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
420 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
421
422 def taskDummy(self):
423 """Place holder to catch broken state handling."""
424 raise Exception();
425
426 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
427 """
428 Kicks of a new task.
429
430 cMsTimeout: The task timeout in milliseconds. Values less than
431 500 ms will be adjusted to 500 ms. This means it is
432 OK to use negative value.
433 sStatus: The task status.
434 fnTask: The method that'll execute the task.
435 aArgs: Arguments to pass to fnTask.
436
437 Returns True on success, False + error in log on failure.
438 """
439 if not self.cancelTask():
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
441 return False;
442
443 # Change status and make sure we're the
444 self.lockTask();
445 if self.sStatus != "":
446 self.unlockTask();
447 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
448 return False;
449 self.sStatus = "setup";
450 self.oTaskRc = None;
451 self.t3oReply = (None, None, None);
452 self.resetTaskLocked();
453 self.unlockTask();
454
455 self.cMsTimeout = max(cMsTimeout, 500);
456 self.fErr = not fIgnoreErrors;
457 self.fnTask = fnTask;
458 self.aTaskArgs = aArgs;
459 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
460 self.oThread.setDaemon(True);
461 self.msStart = base.timestampMilli();
462
463 self.lockTask();
464 self.sStatus = sStatus;
465 self.unlockTask();
466 self.oThread.start();
467
468 return True;
469
470 def cancelTask(self, fSync = True):
471 """
472 Attempts to cancel any pending tasks.
473 Returns success indicator (True/False).
474 """
475 self.lockTask();
476
477 if self.sStatus == "":
478 self.unlockTask();
479 return True;
480 if self.sStatus == "setup":
481 self.unlockTask();
482 return False;
483 if self.sStatus == "cancelled":
484 self.unlockTask();
485 return False;
486
487 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
488 if self.sStatus == 'connecting':
489 self.oTransport.cancelConnect();
490
491 self.sStatus = "cancelled";
492 oThread = self.oThread;
493 self.unlockTask();
494
495 if not fSync:
496 return False;
497
498 oThread.join(61.0);
499 return oThread.isAlive();
500
501 def taskThread(self):
502 """
503 The task thread function.
504 This does some housekeeping activities around the real task method call.
505 """
506 if not self.isCancelled():
507 try:
508 fnTask = self.fnTask;
509 oTaskRc = fnTask(*self.aTaskArgs);
510 except:
511 reporter.fatalXcpt('taskThread', 15);
512 oTaskRc = None;
513 else:
514 reporter.log('taskThread: cancelled already');
515
516 self.lockTask();
517
518 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
519 self.oTaskRc = oTaskRc;
520 self.oThread = None;
521 self.sStatus = '';
522 self.signalTaskLocked();
523
524 self.unlockTask();
525 return None;
526
527 def isCancelled(self):
528 """Internal method for checking if the task has been cancelled."""
529 self.lockTask();
530 sStatus = self.sStatus;
531 self.unlockTask();
532 if sStatus == "cancelled":
533 return True;
534 return False;
535
536 def hasTimedOut(self):
537 """Internal method for checking if the task has timed out or not."""
538 cMsLeft = self.getMsLeft();
539 if cMsLeft <= 0:
540 return True;
541 return False;
542
543 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
544 """Gets the time left until the timeout."""
545 cMsElapsed = base.timestampMilli() - self.msStart;
546 if cMsElapsed < 0:
547 return cMsMin;
548 cMsLeft = self.cMsTimeout - cMsElapsed;
549 if cMsLeft <= cMsMin:
550 return cMsMin;
551 if cMsLeft > cMsMax > 0:
552 return cMsMax
553 return cMsLeft;
554
555 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
556 """
557 Wrapper for TransportBase.recvMsg that stashes the response away
558 so the client can inspect it later on.
559 """
560 if cMsTimeout is None:
561 cMsTimeout = self.getMsLeft(500);
562 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
563 self.lockTask();
564 self.t3oReply = (cbMsg, sOpcode, abPayload);
565 self.unlockTask();
566 return (cbMsg, sOpcode, abPayload);
567
568 def recvAck(self, fNoDataOk = False):
569 """
570 Receives an ACK or error response from the TXS.
571
572 Returns True on success.
573 Returns False on timeout or transport error.
574 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
575 and there are always details of some sort or another.
576 """
577 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
578 if cbMsg is None:
579 return False;
580 sOpcode = sOpcode.strip()
581 if sOpcode == "ACK":
582 return True;
583 return (sOpcode, getSZ(abPayload, 0, sOpcode));
584
585 def recvAckLogged(self, sCommand, fNoDataOk = False):
586 """
587 Wrapper for recvAck and logging.
588 Returns True on success (ACK).
589 Returns False on time, transport error and errors signalled by TXS.
590 """
591 rc = self.recvAck(fNoDataOk);
592 if rc is not True and not fNoDataOk:
593 if rc is False:
594 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
595 else:
596 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
597 rc = False;
598 return rc;
599
600 def recvTrueFalse(self, sCommand):
601 """
602 Receives a TRUE/FALSE response from the TXS.
603 Returns True on TRUE, False on FALSE and None on error/other (logged).
604 """
605 cbMsg, sOpcode, abPayload = self.recvReply();
606 if cbMsg is None:
607 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
608 return None;
609
610 sOpcode = sOpcode.strip()
611 if sOpcode == "TRUE":
612 return True;
613 if sOpcode == "FALSE":
614 return False;
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
616 return None;
617
618 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
619 """
620 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
621 """
622 if cMsTimeout is None:
623 cMsTimeout = self.getMsLeft(500);
624 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
625
626 def asyncToSync(self, fnAsync, *aArgs):
627 """
628 Wraps an asynchronous task into a synchronous operation.
629
630 Returns False on failure, task return status on success.
631 """
632 rc = fnAsync(*aArgs);
633 if rc is False:
634 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
635 return rc;
636
637 rc = self.waitForTask(self.cMsTimeout + 5000);
638 if rc is False:
639 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask failed...');
640 self.cancelTask();
641 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
642 return False;
643
644 rc = self.getResult();
645 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
646 return rc;
647
648 #
649 # Connection tasks.
650 #
651
652 def taskConnect(self, cMsIdleFudge):
653 """Tries to connect to the TXS"""
654 while not self.isCancelled():
655 reporter.log2('taskConnect: connecting ...');
656 rc = self.oTransport.connect(self.getMsLeft(500));
657 if rc is True:
658 reporter.log('taskConnect: succeeded');
659 return self.taskGreet(cMsIdleFudge);
660 if rc is None:
661 reporter.log2('taskConnect: unable to connect');
662 return None;
663 if self.hasTimedOut():
664 reporter.log2('taskConnect: timed out');
665 if not self.fTryConnect:
666 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
667 return False;
668 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
669 if not self.fTryConnect:
670 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
671 return False;
672
673 def taskGreet(self, cMsIdleFudge):
674 """Greets the TXS"""
675 rc = self.sendMsg("HOWDY", ());
676 if rc is True:
677 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
678 if rc is True:
679 while cMsIdleFudge > 0:
680 cMsIdleFudge -= 1000;
681 time.sleep(1);
682 else:
683 self.oTransport.disconnect(self.fTryConnect);
684 return rc;
685
686 def taskBye(self):
687 """Says goodbye to the TXS"""
688 rc = self.sendMsg("BYE");
689 if rc is True:
690 rc = self.recvAckLogged("BYE");
691 self.oTransport.disconnect();
692 return rc;
693
694 def taskUuid(self):
695 """Gets the TXS UUID"""
696 rc = self.sendMsg("UUID");
697 if rc is True:
698 rc = False;
699 cbMsg, sOpcode, abPayload = self.recvReply();
700 if cbMsg is not None:
701 sOpcode = sOpcode.strip()
702 if sOpcode == "ACK UUID":
703 sUuid = getSZ(abPayload, 0);
704 if sUuid is not None:
705 sUuid = '{%s}' % (sUuid,)
706 try:
707 _ = uuid.UUID(sUuid);
708 rc = sUuid;
709 except:
710 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
711 else:
712 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
713 else:
714 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
715 else:
716 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
717 return rc;
718
719 #
720 # Process task
721 # pylint: disable=missing-docstring
722 #
723
724 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
725 # Construct the payload.
726 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
727 for sArg in asArgs:
728 aoPayload.append('%s' % (sArg));
729 aoPayload.append(long(len(asAddEnv)));
730 for sPutEnv in asAddEnv:
731 aoPayload.append('%s' % (sPutEnv));
732 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
733 if utils.isString(o):
734 aoPayload.append(o);
735 elif o is not None:
736 aoPayload.append('|');
737 o.uTxsClientCrc32 = zlib.crc32(b'');
738 else:
739 aoPayload.append('');
740 aoPayload.append('%s' % (sAsUser));
741 aoPayload.append(long(self.cMsTimeout));
742
743 # Kick of the EXEC command.
744 rc = self.sendMsg('EXEC', aoPayload)
745 if rc is True:
746 rc = self.recvAckLogged('EXEC');
747 if rc is True:
748 # Loop till the process completes, feed input to the TXS and
749 # receive output from it.
750 sFailure = "";
751 msPendingInputReply = None;
752 cbMsg, sOpcode, abPayload = (None, None, None);
753 while True:
754 # Pending input?
755 if msPendingInputReply is None \
756 and oStdIn is not None \
757 and not utils.isString(oStdIn):
758 try:
759 sInput = oStdIn.read(65536);
760 except:
761 reporter.errorXcpt('read standard in');
762 sFailure = 'exception reading stdin';
763 rc = None;
764 break;
765 if sInput:
766 oStdIn.uTxsClientCrc32 = zlib.crc32(sInput, oStdIn.uTxsClientCrc32);
767 # Convert to a byte array before handing it of to sendMsg or the string
768 # will get some zero termination added breaking the CRC (and injecting
769 # unwanted bytes).
770 abInput = array.array('B', sInput);
771 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
772 if rc is not True:
773 sFailure = 'sendMsg failure';
774 break;
775 msPendingInputReply = base.timestampMilli();
776 continue;
777
778 rc = self.sendMsg('STDINEOS');
779 oStdIn = None;
780 if rc is not True:
781 sFailure = 'sendMsg failure';
782 break;
783 msPendingInputReply = base.timestampMilli();
784
785 # Wait for input (500 ms timeout).
786 if cbMsg is None:
787 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
788 if cbMsg is None:
789 # Check for time out before restarting the loop.
790 # Note! Only doing timeout checking here does mean that
791 # the TXS may prevent us from timing out by
792 # flooding us with data. This is unlikely though.
793 if self.hasTimedOut() \
794 and ( msPendingInputReply is None \
795 or base.timestampMilli() - msPendingInputReply > 30000):
796 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
797 sFailure = 'timeout';
798 rc = None;
799 break;
800 # Check that the connection is OK.
801 if not self.oTransport.isConnectionOk():
802 self.oTransport.disconnect();
803 sFailure = 'disconnected';
804 rc = False;
805 break;
806 continue;
807
808 # Handle the response.
809 sOpcode = sOpcode.rstrip();
810 if sOpcode == 'STDOUT':
811 oOut = oStdOut;
812 elif sOpcode == 'STDERR':
813 oOut = oStdErr;
814 elif sOpcode == 'TESTPIPE':
815 oOut = oTestPipe;
816 else:
817 oOut = None;
818 if oOut is not None:
819 # Output from the process.
820 if len(abPayload) < 4:
821 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
822 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
823 rc = None;
824 break;
825 uStreamCrc32 = getU32(abPayload, 0);
826 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
827 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
828 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
829 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
830 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
831 rc = None;
832 break;
833 try:
834 oOut.write(abPayload[4:]);
835 except:
836 sFailure = 'exception writing %s' % (sOpcode);
837 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
838 rc = None;
839 break;
840 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
841 # Standard input is ignored. Ignore this condition for now.
842 msPendingInputReply = None;
843 reporter.log('taskExecEx: Standard input is ignored... why?');
844 del oStdIn.uTxsClientCrc32;
845 oStdIn = '/dev/null';
846 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
847 and msPendingInputReply is not None:
848 # TXS STDIN error, abort.
849 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
850 msPendingInputReply = None;
851 sFailure = 'TXS is out of memory for std input buffering';
852 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
853 rc = None;
854 break;
855 elif sOpcode == 'ACK' and msPendingInputReply is not None:
856 msPendingInputReply = None;
857 elif sOpcode.startswith('PROC '):
858 # Process status message, handle it outside the loop.
859 rc = True;
860 break;
861 else:
862 sFailure = 'Unexpected opcode %s' % (sOpcode);
863 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
864 rc = None;
865 break;
866 # Clear the message.
867 cbMsg, sOpcode, abPayload = (None, None, None);
868
869 # If we sent an STDIN packet and didn't get a reply yet, we'll give
870 # TXS some 5 seconds to reply to this. If we don't wait here we'll
871 # get screwed later on if we mix it up with the reply to some other
872 # command. Hackish.
873 if msPendingInputReply is not None:
874 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
875 if cbMsg2 is not None:
876 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
877 % (cbMsg2, sOpcode2, abPayload2));
878 msPendingInputReply = None;
879 else:
880 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
881 self.fScrewedUpMsgState = True;
882
883 # Parse the exit status (True), abort (None) or do nothing (False).
884 if rc is True:
885 if sOpcode != 'PROC OK':
886 # Do proper parsing some other day if needed:
887 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
888 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
889 rc = False;
890 else:
891 if rc is None:
892 # Abort it.
893 reporter.log('taskExecEx: sending ABORT...');
894 rc = self.sendMsg('ABORT');
895 while rc is True:
896 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
897 if cbMsg is None:
898 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
899 self.fScrewedUpMsgState = True;
900 break;
901 if sOpcode.startswith('PROC '):
902 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
903 break;
904 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
905 # Check that the connection is OK before looping.
906 if not self.oTransport.isConnectionOk():
907 self.oTransport.disconnect();
908 break;
909
910 # Fake response with the reason why we quit.
911 if sFailure is not None:
912 self.t3oReply = (0, 'EXECFAIL', sFailure);
913 rc = None;
914 else:
915 rc = None;
916
917 # Cleanup.
918 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
919 if o is not None and not utils.isString(o):
920 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
921 # Make sure all files are closed
922 o.close(); # pylint: disable=maybe-no-member
923 reporter.log('taskExecEx: returns %s' % (rc));
924 return rc;
925
926 #
927 # Admin tasks
928 #
929
930 def hlpRebootShutdownWaitForAck(self, sCmd):
931 """Wait for reboot/shutodwn ACK."""
932 rc = self.recvAckLogged(sCmd);
933 if rc is True:
934 # poll a little while for server to disconnect.
935 uMsStart = base.timestampMilli();
936 while self.oTransport.isConnectionOk() \
937 and base.timestampMilli() - uMsStart >= 5000:
938 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
939 break;
940 self.oTransport.disconnect();
941 return rc;
942
943 def taskReboot(self):
944 rc = self.sendMsg('REBOOT');
945 if rc is True:
946 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
947 return rc;
948
949 def taskShutdown(self):
950 rc = self.sendMsg('SHUTDOWN');
951 if rc is True:
952 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
953 return rc;
954
955 #
956 # CD/DVD control tasks.
957 #
958
959 ## TODO
960
961 #
962 # File system tasks
963 #
964
965 def taskMkDir(self, sRemoteDir, fMode):
966 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
967 if rc is True:
968 rc = self.recvAckLogged('MKDIR');
969 return rc;
970
971 def taskMkDirPath(self, sRemoteDir, fMode):
972 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
973 if rc is True:
974 rc = self.recvAckLogged('MKDRPATH');
975 return rc;
976
977 def taskMkSymlink(self, sLinkTarget, sLink):
978 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
979 if rc is True:
980 rc = self.recvAckLogged('MKSYMLNK');
981 return rc;
982
983 def taskRmDir(self, sRemoteDir):
984 rc = self.sendMsg('RMDIR', (sRemoteDir,));
985 if rc is True:
986 rc = self.recvAckLogged('RMDIR');
987 return rc;
988
989 def taskRmFile(self, sRemoteFile):
990 rc = self.sendMsg('RMFILE', (sRemoteFile,));
991 if rc is True:
992 rc = self.recvAckLogged('RMFILE');
993 return rc;
994
995 def taskRmSymlink(self, sRemoteSymlink):
996 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
997 if rc is True:
998 rc = self.recvAckLogged('RMSYMLNK');
999 return rc;
1000
1001 def taskRmTree(self, sRemoteTree):
1002 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1003 if rc is True:
1004 rc = self.recvAckLogged('RMTREE');
1005 return rc;
1006
1007 def taskChMod(self, sRemotePath, fMode):
1008 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1009 if rc is True:
1010 rc = self.recvAckLogged('CHMOD');
1011 return rc;
1012
1013 def taskChOwn(self, sRemotePath, idUser, idGroup):
1014 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1015 if rc is True:
1016 rc = self.recvAckLogged('CHOWN');
1017 return rc;
1018
1019 def taskIsDir(self, sRemoteDir):
1020 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1021 if rc is True:
1022 rc = self.recvTrueFalse('ISDIR');
1023 return rc;
1024
1025 def taskIsFile(self, sRemoteFile):
1026 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1027 if rc is True:
1028 rc = self.recvTrueFalse('ISFILE');
1029 return rc;
1030
1031 def taskIsSymlink(self, sRemoteSymlink):
1032 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1033 if rc is True:
1034 rc = self.recvTrueFalse('ISSYMLNK');
1035 return rc;
1036
1037 #def "STAT "
1038 #def "LSTAT "
1039 #def "LIST "
1040
1041 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1042 #
1043 # Open the local file (make sure it exist before bothering TXS) and
1044 # tell TXS that we want to upload a file.
1045 #
1046 try:
1047 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1048 except:
1049 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1050 return False;
1051
1052 # Common cause with taskUploadStr
1053 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1054
1055 # Cleanup.
1056 oLocalFile.close();
1057 return rc;
1058
1059 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1060 # Wrap sContent in a file like class.
1061 class InStringFile(object): # pylint: disable=too-few-public-methods
1062 def __init__(self, sContent):
1063 self.sContent = sContent;
1064 self.off = 0;
1065
1066 def read(self, cbMax):
1067 cbLeft = len(self.sContent) - self.off;
1068 if cbLeft == 0:
1069 return "";
1070 if cbLeft <= cbMax:
1071 sRet = self.sContent[self.off:(self.off + cbLeft)];
1072 else:
1073 sRet = self.sContent[self.off:(self.off + cbMax)];
1074 self.off = self.off + len(sRet);
1075 return sRet;
1076
1077 oLocalString = InStringFile(sContent);
1078 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1079
1080 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1081 """Common worker used by taskUploadFile and taskUploadString."""
1082 #
1083 # Command + ACK.
1084 #
1085 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1086 # Fall back on the old command if the new one is not known by the TXS.
1087 #
1088 if fMode == 0:
1089 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1090 if rc is True:
1091 rc = self.recvAckLogged('PUT FILE');
1092 else:
1093 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1094 if rc is True:
1095 rc = self.recvAck();
1096 if rc is False:
1097 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1098 elif rc is not True:
1099 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1100 # Fallback:
1101 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1102 if rc is True:
1103 rc = self.recvAckLogged('PUT FILE');
1104 else:
1105 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1106 rc = False;
1107 if rc is True:
1108 #
1109 # Push data packets until eof.
1110 #
1111 uMyCrc32 = zlib.crc32(b'');
1112 while True:
1113 # Read up to 64 KB of data.
1114 try:
1115 sRaw = oLocalFile.read(65536);
1116 except:
1117 rc = None;
1118 break;
1119
1120 # Convert to array - this is silly!
1121 abBuf = array.array('B');
1122 if utils.isString(sRaw):
1123 for i, _ in enumerate(sRaw):
1124 abBuf.append(ord(sRaw[i]));
1125 else:
1126 abBuf.extend(sRaw);
1127 sRaw = None;
1128
1129 # Update the file stream CRC and send it off.
1130 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1131 if not abBuf:
1132 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1133 else:
1134 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1135 if rc is False:
1136 break;
1137
1138 # Wait for the reply.
1139 rc = self.recvAck();
1140 if rc is not True:
1141 if rc is False:
1142 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1143 else:
1144 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1145 rc = False;
1146 break;
1147
1148 # EOF?
1149 if not abBuf:
1150 break;
1151
1152 # Send ABORT on ACK and I/O errors.
1153 if rc is None:
1154 rc = self.sendMsg('ABORT');
1155 if rc is True:
1156 self.recvAckLogged('ABORT');
1157 rc = False;
1158 return rc;
1159
1160 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1161 try:
1162 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1163 except:
1164 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1165 return False;
1166
1167 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1168
1169 oLocalFile.close();
1170 if rc is False:
1171 try:
1172 os.remove(sLocalFile);
1173 except:
1174 reporter.errorXcpt();
1175 return rc;
1176
1177 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1178 # Wrap sContent in a file like class.
1179 class OutStringFile(object): # pylint: disable=too-few-public-methods
1180 def __init__(self):
1181 self.asContent = [];
1182
1183 def write(self, sBuf):
1184 self.asContent.append(sBuf);
1185 return None;
1186
1187 oLocalString = OutStringFile();
1188 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1189 if rc is True:
1190 rc = '';
1191 for sBuf in oLocalString.asContent:
1192 if hasattr(sBuf, 'decode'):
1193 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1194 else:
1195 rc += sBuf;
1196 return rc;
1197
1198 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1199 """Common worker for taskDownloadFile and taskDownloadString."""
1200 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1201 if rc is True:
1202 #
1203 # Process data packets until eof.
1204 #
1205 uMyCrc32 = zlib.crc32(b'');
1206 while rc is True:
1207 cbMsg, sOpcode, abPayload = self.recvReply();
1208 if cbMsg is None:
1209 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1210 rc = None;
1211 break;
1212
1213 # Validate.
1214 sOpcode = sOpcode.rstrip();
1215 if sOpcode not in ('DATA', 'DATA EOF',):
1216 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1217 % (sOpcode, getSZ(abPayload, 0, "None")));
1218 rc = False;
1219 break;
1220 if sOpcode == 'DATA' and len(abPayload) < 4:
1221 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1222 rc = None;
1223 break;
1224 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1225 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1226 rc = None;
1227 break;
1228
1229 # Check the CRC (common for both packets).
1230 uCrc32 = getU32(abPayload, 0);
1231 if sOpcode == 'DATA':
1232 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1233 if uCrc32 != (uMyCrc32 & 0xffffffff):
1234 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1235 % (hex(uMyCrc32), hex(uCrc32)));
1236 rc = None;
1237 break;
1238 if sOpcode == 'DATA EOF':
1239 rc = self.sendMsg('ACK');
1240 break;
1241
1242 # Finally, push the data to the file.
1243 try:
1244 oLocalFile.write(abPayload[4:].tostring());
1245 except:
1246 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1247 rc = None;
1248 break;
1249 rc = self.sendMsg('ACK');
1250
1251 # Send NACK on validation and I/O errors.
1252 if rc is None:
1253 rc = self.sendMsg('NACK');
1254 rc = False;
1255 return rc;
1256
1257 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1258 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1259 if rc is True:
1260 rc = self.recvAckLogged('UNPKFILE');
1261 return rc;
1262
1263 # pylint: enable=missing-docstring
1264
1265
1266 #
1267 # Public methods - generic task queries
1268 #
1269
1270 def isSuccess(self):
1271 """Returns True if the task completed successfully, otherwise False."""
1272 self.lockTask();
1273 sStatus = self.sStatus;
1274 oTaskRc = self.oTaskRc;
1275 self.unlockTask();
1276 if sStatus != "":
1277 return False;
1278 if oTaskRc is False or oTaskRc is None:
1279 return False;
1280 return True;
1281
1282 def getResult(self):
1283 """
1284 Returns the result of a completed task.
1285 Returns None if not completed yet or no previous task.
1286 """
1287 self.lockTask();
1288 sStatus = self.sStatus;
1289 oTaskRc = self.oTaskRc;
1290 self.unlockTask();
1291 if sStatus != "":
1292 return None;
1293 return oTaskRc;
1294
1295 def getLastReply(self):
1296 """
1297 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1298 Returns a None, None, None three-tuple if there was no last reply.
1299 """
1300 self.lockTask();
1301 t3oReply = self.t3oReply;
1302 self.unlockTask();
1303 return t3oReply;
1304
1305 #
1306 # Public methods - connection.
1307 #
1308
1309 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1310 """
1311 Initiates a disconnect task.
1312
1313 Returns True on success, False on failure (logged).
1314
1315 The task returns True on success and False on failure.
1316 """
1317 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1318
1319 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1320 """Synchronous version."""
1321 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1322
1323 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1324 """
1325 Initiates a task for getting the TXS UUID.
1326
1327 Returns True on success, False on failure (logged).
1328
1329 The task returns UUID string (in {}) on success and False on failure.
1330 """
1331 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1332
1333 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1334 """Synchronous version."""
1335 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1336
1337 #
1338 # Public methods - execution.
1339 #
1340
1341 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1342 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1343 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1344 """
1345 Initiates a exec process task.
1346
1347 Returns True on success, False on failure (logged).
1348
1349 The task returns True if the process exited normally with status code 0.
1350 The task returns None if on failure prior to executing the process, and
1351 False if the process exited with a different status or in an abnormal
1352 manner. Both None and False are logged of course and further info can
1353 also be obtained by getLastReply().
1354
1355 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1356 these streams. If None, no special action is taken and the output goes
1357 to where ever the TXS sends its output, and ditto for input.
1358 - To send to / read from the bitbucket, pass '/dev/null'.
1359 - To redirect to/from a file, just specify the remote filename.
1360 - To append to a file use '>>' followed by the remote filename.
1361 - To pipe the stream to/from the TXS, specify a file like
1362 object. For StdIn a non-blocking read() method is required. For
1363 the other a write() method is required. Watch out for deadlock
1364 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1365 """
1366 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1367 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1368 oStdOut, oStdErr, oTestPipe, sAsUser));
1369
1370 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1371 oStdIn = '/dev/null', oStdOut = '/dev/null',
1372 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1373 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1374 """Synchronous version."""
1375 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1376 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1377
1378 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1379 cMsTimeout = 3600000, fIgnoreErrors = False):
1380 """
1381 Initiates a exec process test task.
1382
1383 Returns True on success, False on failure (logged).
1384
1385 The task returns True if the process exited normally with status code 0.
1386 The task returns None if on failure prior to executing the process, and
1387 False if the process exited with a different status or in an abnormal
1388 manner. Both None and False are logged of course and further info can
1389 also be obtained by getLastReply().
1390
1391 Standard in is taken from /dev/null. While both standard output and
1392 standard error goes directly to reporter.log(). The testpipe is piped
1393 to reporter.xxxx.
1394 """
1395
1396 sStdIn = '/dev/null';
1397 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1398 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1399 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1400 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1401
1402 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1403 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1404
1405 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1406 cMsTimeout = 3600000, fIgnoreErrors = False):
1407 """Synchronous version."""
1408 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1409 cMsTimeout, fIgnoreErrors);
1410
1411 #
1412 # Public methods - system
1413 #
1414
1415 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1416 """
1417 Initiates a reboot task.
1418
1419 Returns True on success, False on failure (logged).
1420
1421 The task returns True on success, False on failure (logged). The
1422 session will be disconnected on successful task completion.
1423 """
1424 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1425
1426 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1427 """Synchronous version."""
1428 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1429
1430 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1431 """
1432 Initiates a shutdown task.
1433
1434 Returns True on success, False on failure (logged).
1435
1436 The task returns True on success, False on failure (logged).
1437 """
1438 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1439
1440 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1441 """Synchronous version."""
1442 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1443
1444
1445 #
1446 # Public methods - file system
1447 #
1448
1449 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1450 """
1451 Initiates a mkdir task.
1452
1453 Returns True on success, False on failure (logged).
1454
1455 The task returns True on success, False on failure (logged).
1456 """
1457 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1458
1459 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1460 """Synchronous version."""
1461 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1462
1463 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1464 """
1465 Initiates a mkdir -p task.
1466
1467 Returns True on success, False on failure (logged).
1468
1469 The task returns True on success, False on failure (logged).
1470 """
1471 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1472
1473 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1474 """Synchronous version."""
1475 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1476
1477 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1478 """
1479 Initiates a symlink task.
1480
1481 Returns True on success, False on failure (logged).
1482
1483 The task returns True on success, False on failure (logged).
1484 """
1485 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1486
1487 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1488 """Synchronous version."""
1489 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1490
1491 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1492 """
1493 Initiates a rmdir task.
1494
1495 Returns True on success, False on failure (logged).
1496
1497 The task returns True on success, False on failure (logged).
1498 """
1499 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1500
1501 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1502 """Synchronous version."""
1503 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1504
1505 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1506 """
1507 Initiates a rmfile task.
1508
1509 Returns True on success, False on failure (logged).
1510
1511 The task returns True on success, False on failure (logged).
1512 """
1513 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1514
1515 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1516 """Synchronous version."""
1517 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1518
1519 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1520 """
1521 Initiates a rmsymlink task.
1522
1523 Returns True on success, False on failure (logged).
1524
1525 The task returns True on success, False on failure (logged).
1526 """
1527 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1528
1529 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1530 """Synchronous version."""
1531 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1532
1533 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1534 """
1535 Initiates a rmtree task.
1536
1537 Returns True on success, False on failure (logged).
1538
1539 The task returns True on success, False on failure (logged).
1540 """
1541 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1542
1543 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1544 """Synchronous version."""
1545 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1546
1547 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1548 """
1549 Initiates a chmod task.
1550
1551 Returns True on success, False on failure (logged).
1552
1553 The task returns True on success, False on failure (logged).
1554 """
1555 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1556
1557 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1558 """Synchronous version."""
1559 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1560
1561 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1562 """
1563 Initiates a chown task.
1564
1565 Returns True on success, False on failure (logged).
1566
1567 The task returns True on success, False on failure (logged).
1568 """
1569 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1570
1571 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1572 """Synchronous version."""
1573 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1574
1575 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1576 """
1577 Initiates a is-dir query task.
1578
1579 Returns True on success, False on failure (logged).
1580
1581 The task returns True if it's a directory, False if it isn't, and
1582 None on error (logged).
1583 """
1584 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1585
1586 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1587 """Synchronous version."""
1588 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1589
1590 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1591 """
1592 Initiates a is-file query task.
1593
1594 Returns True on success, False on failure (logged).
1595
1596 The task returns True if it's a file, False if it isn't, and None on
1597 error (logged).
1598 """
1599 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1600
1601 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """Synchronous version."""
1603 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1604
1605 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1606 """
1607 Initiates a is-symbolic-link query task.
1608
1609 Returns True on success, False on failure (logged).
1610
1611 The task returns True if it's a symbolic linke, False if it isn't, and
1612 None on error (logged).
1613 """
1614 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1615
1616 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1617 """Synchronous version."""
1618 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1619
1620 #def "STAT "
1621 #def "LSTAT "
1622 #def "LIST "
1623
1624 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1625 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1626 """
1627 Initiates a download query task.
1628
1629 Returns True on success, False on failure (logged).
1630
1631 The task returns True on success, False on failure (logged).
1632 """
1633 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1634 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1635
1636 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1637 """Synchronous version."""
1638 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1639
1640 def asyncUploadString(self, sContent, sRemoteFile,
1641 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1642 """
1643 Initiates a upload string task.
1644
1645 Returns True on success, False on failure (logged).
1646
1647 The task returns True on success, False on failure (logged).
1648 """
1649 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1650 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1651
1652 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1653 """Synchronous version."""
1654 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1655
1656 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1657 """
1658 Initiates a download file task.
1659
1660 Returns True on success, False on failure (logged).
1661
1662 The task returns True on success, False on failure (logged).
1663 """
1664 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1665
1666 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1667 """Synchronous version."""
1668 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1669
1670 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1671 cMsTimeout = 30000, fIgnoreErrors = False):
1672 """
1673 Initiates a download string task.
1674
1675 Returns True on success, False on failure (logged).
1676
1677 The task returns a byte string on success, False on failure (logged).
1678 """
1679 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1680 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1681
1682 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1683 cMsTimeout = 30000, fIgnoreErrors = False):
1684 """Synchronous version."""
1685 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1686 cMsTimeout, fIgnoreErrors);
1687
1688 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1689 """
1690 Initiates a unpack file task.
1691
1692 Returns True on success, False on failure (logged).
1693
1694 The task returns True on success, False on failure (logged).
1695 """
1696 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1697 (sRemoteFile, sRemoteDir));
1698
1699 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1700 """Synchronous version."""
1701 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1702
1703
1704class TransportTcp(TransportBase):
1705 """
1706 TCP transport layer for the TXS client session class.
1707 """
1708
1709 def __init__(self, sHostname, uPort, fReversedSetup):
1710 """
1711 Save the parameters. The session will call us back to make the
1712 connection later on its worker thread.
1713 """
1714 TransportBase.__init__(self, utils.getCallerName());
1715 self.sHostname = sHostname;
1716 self.fReversedSetup = fReversedSetup;
1717 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1718 self.oSocket = None;
1719 self.oWakeupW = None;
1720 self.oWakeupR = None;
1721 self.fConnectCanceled = False;
1722 self.fIsConnecting = False;
1723 self.oCv = threading.Condition();
1724 self.abReadAhead = array.array('B');
1725
1726 def toString(self):
1727 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1728 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1729 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1730 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1731
1732 def __isInProgressXcpt(self, oXcpt):
1733 """ In progress exception? """
1734 try:
1735 if isinstance(oXcpt, socket.error):
1736 try:
1737 if oXcpt.errno == errno.EINPROGRESS:
1738 return True;
1739 except: pass;
1740 # Windows?
1741 try:
1742 if oXcpt.errno == errno.EWOULDBLOCK:
1743 return True;
1744 except: pass;
1745 except:
1746 pass;
1747 return False;
1748
1749 def __isWouldBlockXcpt(self, oXcpt):
1750 """ Would block exception? """
1751 try:
1752 if isinstance(oXcpt, socket.error):
1753 try:
1754 if oXcpt.errno == errno.EWOULDBLOCK:
1755 return True;
1756 except: pass;
1757 try:
1758 if oXcpt.errno == errno.EAGAIN:
1759 return True;
1760 except: pass;
1761 except:
1762 pass;
1763 return False;
1764
1765 def __isConnectionReset(self, oXcpt):
1766 """ Connection reset by Peer or others. """
1767 try:
1768 if isinstance(oXcpt, socket.error):
1769 try:
1770 if oXcpt.errno == errno.ECONNRESET:
1771 return True;
1772 except: pass;
1773 try:
1774 if oXcpt.errno == errno.ENETRESET:
1775 return True;
1776 except: pass;
1777 except:
1778 pass;
1779 return False;
1780
1781 def _closeWakeupSockets(self):
1782 """ Closes the wakup sockets. Caller should own the CV. """
1783 oWakeupR = self.oWakeupR;
1784 self.oWakeupR = None;
1785 if oWakeupR is not None:
1786 oWakeupR.close();
1787
1788 oWakeupW = self.oWakeupW;
1789 self.oWakeupW = None;
1790 if oWakeupW is not None:
1791 oWakeupW.close();
1792
1793 return None;
1794
1795 def cancelConnect(self):
1796 # This is bad stuff.
1797 self.oCv.acquire();
1798 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1799 self.fConnectCanceled = True;
1800 if self.fIsConnecting:
1801 oSocket = self.oSocket;
1802 self.oSocket = None;
1803 if oSocket is not None:
1804 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1805 oSocket.close();
1806
1807 oWakeupW = self.oWakeupW;
1808 self.oWakeupW = None;
1809 if oWakeupW is not None:
1810 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1811 try: oWakeupW.send('cancelled!\n');
1812 except: reporter.logXcpt();
1813 try: oWakeupW.shutdown(socket.SHUT_WR);
1814 except: reporter.logXcpt();
1815 oWakeupW.close();
1816 self.oCv.release();
1817
1818 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1819 """ Connects to the TXS server as server, i.e. the reversed setup. """
1820 assert(self.fReversedSetup);
1821
1822 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1823
1824 # Workaround for bind() failure...
1825 try:
1826 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1827 except:
1828 reporter.errorXcpt('socket.listen(1) failed');
1829 return None;
1830
1831 # Bind the socket and make it listen.
1832 try:
1833 oSocket.bind((self.sHostname, self.uPort));
1834 except:
1835 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1836 return None;
1837 try:
1838 oSocket.listen(1);
1839 except:
1840 reporter.errorXcpt('socket.listen(1) failed');
1841 return None;
1842
1843 # Accept connections.
1844 oClientSocket = None;
1845 tClientAddr = None;
1846 try:
1847 (oClientSocket, tClientAddr) = oSocket.accept();
1848 except socket.error as e:
1849 if not self.__isInProgressXcpt(e):
1850 raise;
1851
1852 # Do the actual waiting.
1853 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1854 try:
1855 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1856 except socket.error as e:
1857 if e[0] != errno.EBADF or not self.fConnectCanceled:
1858 raise;
1859 reporter.log('socket.select() on accept was canceled');
1860 return None;
1861 except:
1862 reporter.logXcpt('socket.select() on accept');
1863
1864 # Try accept again.
1865 try:
1866 (oClientSocket, tClientAddr) = oSocket.accept();
1867 except socket.error as e:
1868 if not self.__isInProgressXcpt(e):
1869 if e[0] != errno.EBADF or not self.fConnectCanceled:
1870 raise;
1871 reporter.log('socket.accept() was canceled');
1872 return None;
1873 reporter.log('socket.accept() timed out');
1874 return False;
1875 except:
1876 reporter.errorXcpt('socket.accept() failed');
1877 return None;
1878 except:
1879 reporter.errorXcpt('socket.accept() failed');
1880 return None;
1881
1882 # Store the connected socket and throw away the server socket.
1883 self.oCv.acquire();
1884 if not self.fConnectCanceled:
1885 self.oSocket.close();
1886 self.oSocket = oClientSocket;
1887 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1888 self.oCv.release();
1889 return True;
1890
1891 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1892 """ Connects to the TXS server as client. """
1893 assert(not self.fReversedSetup);
1894
1895 # Connect w/ timeouts.
1896 rc = None;
1897 try:
1898 oSocket.connect((self.sHostname, self.uPort));
1899 rc = True;
1900 except socket.error as oXcpt:
1901 iRc = oXcpt.errno;
1902 if self.__isInProgressXcpt(oXcpt):
1903 # Do the actual waiting.
1904 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1905 try:
1906 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1907 if len(ttRc[1]) + len(ttRc[2]) == 0:
1908 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1909 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1910 rc = iRc == 0;
1911 except socket.error as oXcpt2:
1912 iRc = oXcpt2.errno;
1913 except:
1914 iRc = -42;
1915 reporter.fatalXcpt('socket.select() on connect failed');
1916
1917 if rc is True:
1918 pass;
1919 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
1920 rc = False; # try again.
1921 else:
1922 if iRc != errno.EBADF or not self.fConnectCanceled:
1923 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1924 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1925 except:
1926 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1927 return rc;
1928
1929
1930 def connect(self, cMsTimeout):
1931 # Create a non-blocking socket.
1932 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1933 try:
1934 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1935 except:
1936 reporter.fatalXcpt('socket.socket() failed');
1937 return None;
1938 try:
1939 oSocket.setblocking(0);
1940 except:
1941 oSocket.close();
1942 reporter.fatalXcpt('socket.socket() failed');
1943 return None;
1944
1945 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1946 oWakeupR = None;
1947 oWakeupW = None;
1948 if hasattr(socket, 'socketpair'):
1949 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
1950 except: reporter.logXcpt('socket.socketpair() failed');
1951
1952 # Update the state.
1953 self.oCv.acquire();
1954 rc = None;
1955 if not self.fConnectCanceled:
1956 self.oSocket = oSocket;
1957 self.oWakeupW = oWakeupW;
1958 self.oWakeupR = oWakeupR;
1959 self.fIsConnecting = True;
1960 self.oCv.release();
1961
1962 # Try connect.
1963 if oWakeupR is None:
1964 oWakeupR = oSocket; # Avoid select failure.
1965 if self.fReversedSetup:
1966 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1967 else:
1968 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1969 oSocket = None;
1970
1971 # Update the state and cleanup on failure/cancel.
1972 self.oCv.acquire();
1973 if rc is True and self.fConnectCanceled:
1974 rc = False;
1975 self.fIsConnecting = False;
1976
1977 if rc is not True:
1978 if self.oSocket is not None:
1979 self.oSocket.close();
1980 self.oSocket = None;
1981 self._closeWakeupSockets();
1982 self.oCv.release();
1983
1984 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1985 return rc;
1986
1987 def disconnect(self, fQuiet = False):
1988 if self.oSocket is not None:
1989 self.abReadAhead = array.array('B');
1990
1991 # Try a shutting down the socket gracefully (draining it).
1992 try:
1993 self.oSocket.shutdown(socket.SHUT_WR);
1994 except:
1995 if not fQuiet:
1996 reporter.error('shutdown(SHUT_WR)');
1997 try:
1998 self.oSocket.setblocking(0); # just in case it's not set.
1999 sData = "1";
2000 while sData:
2001 sData = self.oSocket.recv(16384);
2002 except:
2003 pass;
2004
2005 # Close it.
2006 self.oCv.acquire();
2007 try: self.oSocket.setblocking(1);
2008 except: pass;
2009 self.oSocket.close();
2010 self.oSocket = None;
2011 else:
2012 self.oCv.acquire();
2013 self._closeWakeupSockets();
2014 self.oCv.release();
2015
2016 def sendBytes(self, abBuf, cMsTimeout):
2017 if self.oSocket is None:
2018 reporter.error('TransportTcp.sendBytes: No connection.');
2019 return False;
2020
2021 # Try send it all.
2022 try:
2023 cbSent = self.oSocket.send(abBuf);
2024 if cbSent == len(abBuf):
2025 return True;
2026 except Exception as oXcpt:
2027 if not self.__isWouldBlockXcpt(oXcpt):
2028 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2029 return False;
2030 cbSent = 0;
2031
2032 # Do a timed send.
2033 msStart = base.timestampMilli();
2034 while True:
2035 cMsElapsed = base.timestampMilli() - msStart;
2036 if cMsElapsed > cMsTimeout:
2037 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2038 break;
2039
2040 # wait.
2041 try:
2042 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2043 if ttRc[2] and not ttRc[1]:
2044 reporter.error('TranportTcp.sendBytes: select returned with exception');
2045 break;
2046 if not ttRc[1]:
2047 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2048 break;
2049 except:
2050 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2051 break;
2052
2053 # Try send more.
2054 try:
2055 cbSent += self.oSocket.send(abBuf[cbSent:]);
2056 if cbSent == len(abBuf):
2057 return True;
2058 except Exception as oXcpt:
2059 if not self.__isWouldBlockXcpt(oXcpt):
2060 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2061 break;
2062
2063 return False;
2064
2065 def __returnReadAheadBytes(self, cb):
2066 """ Internal worker for recvBytes. """
2067 assert(len(self.abReadAhead) >= cb);
2068 abRet = self.abReadAhead[:cb];
2069 self.abReadAhead = self.abReadAhead[cb:];
2070 return abRet;
2071
2072 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2073 if self.oSocket is None:
2074 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2075 return None;
2076
2077 # Try read in some more data without bothering with timeout handling first.
2078 if len(self.abReadAhead) < cb:
2079 try:
2080 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2081 if abBuf:
2082 self.abReadAhead.extend(array.array('B', abBuf));
2083 except Exception as oXcpt:
2084 if not self.__isWouldBlockXcpt(oXcpt):
2085 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2086 return None;
2087
2088 if len(self.abReadAhead) >= cb:
2089 return self.__returnReadAheadBytes(cb);
2090
2091 # Timeout loop.
2092 msStart = base.timestampMilli();
2093 while True:
2094 cMsElapsed = base.timestampMilli() - msStart;
2095 if cMsElapsed > cMsTimeout:
2096 if not fNoDataOk or self.abReadAhead:
2097 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2098 break;
2099
2100 # Wait.
2101 try:
2102 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2103 if ttRc[2] and not ttRc[0]:
2104 reporter.error('TranportTcp.recvBytes: select returned with exception');
2105 break;
2106 if not ttRc[0]:
2107 if not fNoDataOk or self.abReadAhead:
2108 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2109 % (len(self.abReadAhead), cb, fNoDataOk));
2110 break;
2111 except:
2112 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2113 break;
2114
2115 # Try read more.
2116 try:
2117 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2118 if not abBuf:
2119 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2120 % (len(self.abReadAhead), cb, fNoDataOk));
2121 self.disconnect();
2122 return None;
2123
2124 self.abReadAhead.extend(array.array('B', abBuf));
2125
2126 except Exception as oXcpt:
2127 reporter.log('recv => exception %s' % (oXcpt,));
2128 if not self.__isWouldBlockXcpt(oXcpt):
2129 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2130 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2131 break;
2132
2133 # Done?
2134 if len(self.abReadAhead) >= cb:
2135 return self.__returnReadAheadBytes(cb);
2136
2137 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2138 return None;
2139
2140 def isConnectionOk(self):
2141 if self.oSocket is None:
2142 return False;
2143 try:
2144 ttRc = select.select([], [], [self.oSocket], 0.0);
2145 if ttRc[2]:
2146 return False;
2147
2148 self.oSocket.send(array.array('B')); # send zero bytes.
2149 except:
2150 return False;
2151 return True;
2152
2153 def isRecvPending(self, cMsTimeout = 0):
2154 try:
2155 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2156 if not ttRc[0]:
2157 return False;
2158 except:
2159 pass;
2160 return True;
2161
2162
2163def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2164 """
2165 Opens a connection to a Test Execution Service via TCP, given its name.
2166
2167 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2168 or similar.
2169 """
2170 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2171 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2172 try:
2173 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2174 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2175 except:
2176 reporter.errorXcpt(None, 15);
2177 return None;
2178 return oSession;
2179
2180
2181def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2182 """
2183 Tries to open a connection to a Test Execution Service via TCP, given its name.
2184
2185 This differs from openTcpSession in that it won't log a connection failure
2186 as an error.
2187 """
2188 try:
2189 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2190 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2191 except:
2192 reporter.errorXcpt(None, 15);
2193 return None;
2194 return oSession;
2195
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette