VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 79219

Last change on this file since 79219 was 79219, checked in by vboxsync, 5 years ago

txsclient.py: Fixed bad logging call in asyncToSync.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 80.1 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 79219 2019-06-18 21:50:00Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2019 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 79219 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import zlib;
41import uuid;
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49# Python 3 hacks:
50if sys.version_info[0] >= 3:
51 long = int; # pylint: disable=redefined-builtin,invalid-name
52
53#
54# Helpers for decoding data received from the TXS.
55# These are used both the Session and Transport classes.
56#
57
58def getU32(abData, off):
59 """Get a U32 field."""
60 return abData[off] \
61 + abData[off + 1] * 256 \
62 + abData[off + 2] * 65536 \
63 + abData[off + 3] * 16777216;
64
65def getSZ(abData, off, sDefault = None):
66 """
67 Get a zero-terminated string field.
68 Returns sDefault if the string is invalid.
69 """
70 cchStr = getSZLen(abData, off);
71 if cchStr >= 0:
72 abStr = abData[off:(off + cchStr)];
73 try:
74 return abStr.tostring().decode('utf_8');
75 except:
76 reporter.errorXcpt('getSZ(,%u)' % (off));
77 return sDefault;
78
79def getSZLen(abData, off):
80 """
81 Get the length of a zero-terminated string field, in bytes.
82 Returns -1 if off is beyond the data packet or not properly terminated.
83 """
84 cbData = len(abData);
85 if off >= cbData:
86 return -1;
87
88 offCur = off;
89 while abData[offCur] != 0:
90 offCur = offCur + 1;
91 if offCur >= cbData:
92 return -1;
93
94 return offCur - off;
95
96def isValidOpcodeEncoding(sOpcode):
97 """
98 Checks if the specified opcode is valid or not.
99 Returns True on success.
100 Returns False if it is invalid, details in the log.
101 """
102 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
103 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
104 if len(sOpcode) != 8:
105 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
106 return False;
107 for i in range(0, 1):
108 if sSet1.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 for i in range(2, 7):
112 if sSet2.find(sOpcode[i]) < 0:
113 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
114 return False;
115 return True;
116
117#
118# Helper for encoding data sent to the TXS.
119#
120
121def u32ToByteArray(u32):
122 """Encodes the u32 value as a little endian byte (B) array."""
123 return array.array('B', \
124 ( u32 % 256, \
125 (u32 // 256) % 256, \
126 (u32 // 65536) % 256, \
127 (u32 // 16777216) % 256) );
128
129
130
131class TransportBase(object):
132 """
133 Base class for the transport layer.
134 """
135
136 def __init__(self, sCaller):
137 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
138 self.fDummy = 0;
139 self.abReadAheadHdr = array.array('B');
140
141 def toString(self):
142 """
143 Stringify the instance for logging and debugging.
144 """
145 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
146
147 def __str__(self):
148 return self.toString();
149
150 def cancelConnect(self):
151 """
152 Cancels any pending connect() call.
153 Returns None;
154 """
155 return None;
156
157 def connect(self, cMsTimeout):
158 """
159 Quietly attempts to connect to the TXS.
160
161 Returns True on success.
162 Returns False on retryable errors (no logging).
163 Returns None on fatal errors with details in the log.
164
165 Override this method, don't call super.
166 """
167 _ = cMsTimeout;
168 return False;
169
170 def disconnect(self, fQuiet = False):
171 """
172 Disconnect from the TXS.
173
174 Returns True.
175
176 Override this method, don't call super.
177 """
178 _ = fQuiet;
179 return True;
180
181 def sendBytes(self, abBuf, cMsTimeout):
182 """
183 Sends the bytes in the buffer abBuf to the TXS.
184
185 Returns True on success.
186 Returns False on failure and error details in the log.
187
188 Override this method, don't call super.
189
190 Remarks: len(abBuf) is always a multiple of 16.
191 """
192 _ = abBuf; _ = cMsTimeout;
193 return False;
194
195 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
196 """
197 Receive cb number of bytes from the TXS.
198
199 Returns the bytes (array('B')) on success.
200 Returns None on failure and error details in the log.
201
202 Override this method, don't call super.
203
204 Remarks: cb is always a multiple of 16.
205 """
206 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
207 return None;
208
209 def isConnectionOk(self):
210 """
211 Checks if the connection is OK.
212
213 Returns True if it is.
214 Returns False if it isn't (caller should call diconnect).
215
216 Override this method, don't call super.
217 """
218 return True;
219
220 def isRecvPending(self, cMsTimeout = 0):
221 """
222 Checks if there is incoming bytes, optionally waiting cMsTimeout
223 milliseconds for something to arrive.
224
225 Returns True if there is, False if there isn't.
226
227 Override this method, don't call super.
228 """
229 _ = cMsTimeout;
230 return False;
231
232 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
233 """
234 Sends a message (opcode + encoded payload).
235
236 Returns True on success.
237 Returns False on failure and error details in the log.
238 """
239 # Fix + check the opcode.
240 if len(sOpcode) < 2:
241 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
242 return False;
243 sOpcode = sOpcode.ljust(8);
244 if not isValidOpcodeEncoding(sOpcode):
245 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
246 return False;
247
248 # Start construct the message.
249 cbMsg = 16 + len(abPayload);
250 abMsg = array.array('B');
251 abMsg.extend(u32ToByteArray(cbMsg));
252 abMsg.extend((0, 0, 0, 0)); # uCrc32
253 try:
254 abMsg.extend(array.array('B', \
255 ( ord(sOpcode[0]), \
256 ord(sOpcode[1]), \
257 ord(sOpcode[2]), \
258 ord(sOpcode[3]), \
259 ord(sOpcode[4]), \
260 ord(sOpcode[5]), \
261 ord(sOpcode[6]), \
262 ord(sOpcode[7]) ) ) );
263 if abPayload:
264 abMsg.extend(abPayload);
265 except:
266 reporter.fatalXcpt('sendMsgInt: packing problem...');
267 return False;
268
269 # checksum it, padd it and send it off.
270 uCrc32 = zlib.crc32(abMsg[8:]);
271 abMsg[4:8] = u32ToByteArray(uCrc32);
272
273 while len(abMsg) % 16:
274 abMsg.append(0);
275
276 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
277 return self.sendBytes(abMsg, cMsTimeout);
278
279 def recvMsg(self, cMsTimeout, fNoDataOk = False):
280 """
281 Receives a message from the TXS.
282
283 Returns the message three-tuple: length, opcode, payload.
284 Returns (None, None, None) on failure and error details in the log.
285 """
286
287 # Read the header.
288 if self.abReadAheadHdr:
289 assert(len(self.abReadAheadHdr) == 16);
290 abHdr = self.abReadAheadHdr;
291 self.abReadAheadHdr = array.array('B');
292 else:
293 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
294 if abHdr is None:
295 return (None, None, None);
296 if len(abHdr) != 16:
297 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
298 return (None, None, None);
299
300 # Unpack and validate the header.
301 cbMsg = getU32(abHdr, 0);
302 uCrc32 = getU32(abHdr, 4);
303 sOpcode = abHdr[8:16].tostring().decode('ascii');
304
305 if cbMsg < 16:
306 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
307 return (None, None, None);
308 if cbMsg > 1024*1024:
309 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
310 return (None, None, None);
311 if not isValidOpcodeEncoding(sOpcode):
312 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
313 return (None, None, None);
314
315 # Get the payload (if any), dropping the padding.
316 abPayload = array.array('B');
317 if cbMsg > 16:
318 if cbMsg % 16:
319 cbPadding = 16 - (cbMsg % 16);
320 else:
321 cbPadding = 0;
322 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
323 if abPayload is None:
324 self.abReadAheadHdr = abHdr;
325 if not fNoDataOk :
326 reporter.log('recvMsg: failed to recv payload bytes!');
327 return (None, None, None);
328
329 while cbPadding > 0:
330 abPayload.pop();
331 cbPadding = cbPadding - 1;
332
333 # Check the CRC-32.
334 if uCrc32 != 0:
335 uActualCrc32 = zlib.crc32(abHdr[8:]);
336 if cbMsg > 16:
337 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
338 uActualCrc32 = uActualCrc32 & 0xffffffff;
339 if uCrc32 != uActualCrc32:
340 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
341 return (None, None, None);
342
343 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
344 return (cbMsg, sOpcode, abPayload);
345
346 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
347 """
348 Sends a message (opcode + payload tuple).
349
350 Returns True on success.
351 Returns False on failure and error details in the log.
352 Returns None if you pass the incorrectly typed parameters.
353 """
354 # Encode the payload.
355 abPayload = array.array('B');
356 for o in aoPayload:
357 try:
358 if utils.isString(o):
359 if sys.version_info[0] >= 3:
360 abPayload.extend(o.encode('utf_8'));
361 else:
362 # the primitive approach...
363 sUtf8 = o.encode('utf_8');
364 for ch in sUtf8:
365 abPayload.append(ord(ch))
366 abPayload.append(0);
367 elif isinstance(o, long):
368 if o < 0 or o > 0xffffffff:
369 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
370 return None;
371 abPayload.extend(u32ToByteArray(o));
372 elif isinstance(o, array.array):
373 abPayload.extend(o);
374 else:
375 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
376 return None;
377 except:
378 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
379 return None;
380 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
381
382
383class Session(TdTaskBase):
384 """
385 A Test eXecution Service (TXS) client session.
386 """
387
388 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
389 """
390 Construct a TXS session.
391
392 This starts by connecting to the TXS and will enter the signalled state
393 when connected or the timeout has been reached.
394 """
395 TdTaskBase.__init__(self, utils.getCallerName());
396 self.oTransport = oTransport;
397 self.sStatus = "";
398 self.cMsTimeout = 0;
399 self.fErr = True; # Whether to report errors as error.
400 self.msStart = 0;
401 self.oThread = None;
402 self.fnTask = self.taskDummy;
403 self.aTaskArgs = None;
404 self.oTaskRc = None;
405 self.t3oReply = (None, None, None);
406 self.fScrewedUpMsgState = False;
407 self.fTryConnect = fTryConnect;
408
409 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
410 raise base.GenError("startTask failed");
411
412 def __del__(self):
413 """Make sure to cancel the task when deleted."""
414 self.cancelTask();
415
416 def toString(self):
417 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
418 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
419 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
420 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
421
422 def taskDummy(self):
423 """Place holder to catch broken state handling."""
424 raise Exception();
425
426 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
427 """
428 Kicks of a new task.
429
430 cMsTimeout: The task timeout in milliseconds. Values less than
431 500 ms will be adjusted to 500 ms. This means it is
432 OK to use negative value.
433 sStatus: The task status.
434 fnTask: The method that'll execute the task.
435 aArgs: Arguments to pass to fnTask.
436
437 Returns True on success, False + error in log on failure.
438 """
439 if not self.cancelTask():
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
441 return False;
442
443 # Change status and make sure we're the
444 self.lockTask();
445 if self.sStatus != "":
446 self.unlockTask();
447 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
448 return False;
449 self.sStatus = "setup";
450 self.oTaskRc = None;
451 self.t3oReply = (None, None, None);
452 self.resetTaskLocked();
453 self.unlockTask();
454
455 self.cMsTimeout = max(cMsTimeout, 500);
456 self.fErr = not fIgnoreErrors;
457 self.fnTask = fnTask;
458 self.aTaskArgs = aArgs;
459 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
460 self.oThread.setDaemon(True);
461 self.msStart = base.timestampMilli();
462
463 self.lockTask();
464 self.sStatus = sStatus;
465 self.unlockTask();
466 self.oThread.start();
467
468 return True;
469
470 def cancelTask(self, fSync = True):
471 """
472 Attempts to cancel any pending tasks.
473 Returns success indicator (True/False).
474 """
475 self.lockTask();
476
477 if self.sStatus == "":
478 self.unlockTask();
479 return True;
480 if self.sStatus == "setup":
481 self.unlockTask();
482 return False;
483 if self.sStatus == "cancelled":
484 self.unlockTask();
485 return False;
486
487 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
488 if self.sStatus == 'connecting':
489 self.oTransport.cancelConnect();
490
491 self.sStatus = "cancelled";
492 oThread = self.oThread;
493 self.unlockTask();
494
495 if not fSync:
496 return False;
497
498 oThread.join(61.0);
499 return oThread.isAlive();
500
501 def taskThread(self):
502 """
503 The task thread function.
504 This does some housekeeping activities around the real task method call.
505 """
506 if not self.isCancelled():
507 try:
508 fnTask = self.fnTask;
509 oTaskRc = fnTask(*self.aTaskArgs);
510 except:
511 reporter.fatalXcpt('taskThread', 15);
512 oTaskRc = None;
513 else:
514 reporter.log('taskThread: cancelled already');
515
516 self.lockTask();
517
518 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
519 self.oTaskRc = oTaskRc;
520 self.oThread = None;
521 self.sStatus = '';
522 self.signalTaskLocked();
523
524 self.unlockTask();
525 return None;
526
527 def isCancelled(self):
528 """Internal method for checking if the task has been cancelled."""
529 self.lockTask();
530 sStatus = self.sStatus;
531 self.unlockTask();
532 if sStatus == "cancelled":
533 return True;
534 return False;
535
536 def hasTimedOut(self):
537 """Internal method for checking if the task has timed out or not."""
538 cMsLeft = self.getMsLeft();
539 if cMsLeft <= 0:
540 return True;
541 return False;
542
543 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
544 """Gets the time left until the timeout."""
545 cMsElapsed = base.timestampMilli() - self.msStart;
546 if cMsElapsed < 0:
547 return cMsMin;
548 cMsLeft = self.cMsTimeout - cMsElapsed;
549 if cMsLeft <= cMsMin:
550 return cMsMin;
551 if cMsLeft > cMsMax > 0:
552 return cMsMax
553 return cMsLeft;
554
555 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
556 """
557 Wrapper for TransportBase.recvMsg that stashes the response away
558 so the client can inspect it later on.
559 """
560 if cMsTimeout is None:
561 cMsTimeout = self.getMsLeft(500);
562 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
563 self.lockTask();
564 self.t3oReply = (cbMsg, sOpcode, abPayload);
565 self.unlockTask();
566 return (cbMsg, sOpcode, abPayload);
567
568 def recvAck(self, fNoDataOk = False):
569 """
570 Receives an ACK or error response from the TXS.
571
572 Returns True on success.
573 Returns False on timeout or transport error.
574 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
575 and there are always details of some sort or another.
576 """
577 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
578 if cbMsg is None:
579 return False;
580 sOpcode = sOpcode.strip()
581 if sOpcode == "ACK":
582 return True;
583 return (sOpcode, getSZ(abPayload, 0, sOpcode));
584
585 def recvAckLogged(self, sCommand, fNoDataOk = False):
586 """
587 Wrapper for recvAck and logging.
588 Returns True on success (ACK).
589 Returns False on time, transport error and errors signalled by TXS.
590 """
591 rc = self.recvAck(fNoDataOk);
592 if rc is not True and not fNoDataOk:
593 if rc is False:
594 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
595 else:
596 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
597 rc = False;
598 return rc;
599
600 def recvTrueFalse(self, sCommand):
601 """
602 Receives a TRUE/FALSE response from the TXS.
603 Returns True on TRUE, False on FALSE and None on error/other (logged).
604 """
605 cbMsg, sOpcode, abPayload = self.recvReply();
606 if cbMsg is None:
607 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
608 return None;
609
610 sOpcode = sOpcode.strip()
611 if sOpcode == "TRUE":
612 return True;
613 if sOpcode == "FALSE":
614 return False;
615 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
616 return None;
617
618 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
619 """
620 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
621 """
622 if cMsTimeout is None:
623 cMsTimeout = self.getMsLeft(500);
624 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
625
626 def asyncToSync(self, fnAsync, *aArgs):
627 """
628 Wraps an asynchronous task into a synchronous operation.
629
630 Returns False on failure, task return status on success.
631 """
632 rc = fnAsync(*aArgs);
633 if rc is False:
634 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
635 return rc;
636
637 rc = self.waitForTask(self.cMsTimeout + 5000);
638 if rc is False:
639 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask failed...');
640 self.cancelTask();
641 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
642 return False;
643
644 rc = self.getResult();
645 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
646 return rc;
647
648 #
649 # Connection tasks.
650 #
651
652 def taskConnect(self, cMsIdleFudge):
653 """Tries to connect to the TXS"""
654 while not self.isCancelled():
655 reporter.log2('taskConnect: connecting ...');
656 rc = self.oTransport.connect(self.getMsLeft(500));
657 if rc is True:
658 reporter.log('taskConnect: succeeded');
659 return self.taskGreet(cMsIdleFudge);
660 if rc is None:
661 reporter.log2('taskConnect: unable to connect');
662 return None;
663 if self.hasTimedOut():
664 reporter.log2('taskConnect: timed out');
665 if not self.fTryConnect:
666 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
667 return False;
668 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
669 if not self.fTryConnect:
670 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
671 return False;
672
673 def taskGreet(self, cMsIdleFudge):
674 """Greets the TXS"""
675 rc = self.sendMsg("HOWDY", ());
676 if rc is True:
677 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
678 if rc is True:
679 while cMsIdleFudge > 0:
680 cMsIdleFudge -= 1000;
681 time.sleep(1);
682 else:
683 self.oTransport.disconnect(self.fTryConnect);
684 return rc;
685
686 def taskBye(self):
687 """Says goodbye to the TXS"""
688 rc = self.sendMsg("BYE");
689 if rc is True:
690 rc = self.recvAckLogged("BYE");
691 self.oTransport.disconnect();
692 return rc;
693
694 def taskUuid(self):
695 """Gets the TXS UUID"""
696 rc = self.sendMsg("UUID");
697 if rc is True:
698 rc = False;
699 cbMsg, sOpcode, abPayload = self.recvReply();
700 if cbMsg is not None:
701 sOpcode = sOpcode.strip()
702 if sOpcode == "ACK UUID":
703 sUuid = getSZ(abPayload, 0);
704 if sUuid is not None:
705 sUuid = '{%s}' % (sUuid,)
706 try:
707 _ = uuid.UUID(sUuid);
708 rc = sUuid;
709 except:
710 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
711 else:
712 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
713 else:
714 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
715 else:
716 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
717 return rc;
718
719 #
720 # Process task
721 # pylint: disable=missing-docstring
722 #
723
724 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
725 # Construct the payload.
726 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
727 for sArg in asArgs:
728 aoPayload.append('%s' % (sArg));
729 aoPayload.append(long(len(asAddEnv)));
730 for sPutEnv in asAddEnv:
731 aoPayload.append('%s' % (sPutEnv));
732 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
733 if utils.isString(o):
734 aoPayload.append(o);
735 elif o is not None:
736 aoPayload.append('|');
737 o.uTxsClientCrc32 = zlib.crc32(b'');
738 else:
739 aoPayload.append('');
740 aoPayload.append('%s' % (sAsUser));
741 aoPayload.append(long(self.cMsTimeout));
742
743 # Kick of the EXEC command.
744 rc = self.sendMsg('EXEC', aoPayload)
745 if rc is True:
746 rc = self.recvAckLogged('EXEC');
747 if rc is True:
748 # Loop till the process completes, feed input to the TXS and
749 # receive output from it.
750 sFailure = "";
751 msPendingInputReply = None;
752 cbMsg, sOpcode, abPayload = (None, None, None);
753 while True:
754 # Pending input?
755 if msPendingInputReply is None \
756 and oStdIn is not None \
757 and not utils.isString(oStdIn):
758 try:
759 sInput = oStdIn.read(65536);
760 except:
761 reporter.errorXcpt('read standard in');
762 sFailure = 'exception reading stdin';
763 rc = None;
764 break;
765 if sInput:
766 oStdIn.uTxsClientCrc32 = zlib.crc32(sInput, oStdIn.uTxsClientCrc32);
767 # Convert to a byte array before handing it of to sendMsg or the string
768 # will get some zero termination added breaking the CRC (and injecting
769 # unwanted bytes).
770 abInput = array.array('B', sInput);
771 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
772 if rc is not True:
773 sFailure = 'sendMsg failure';
774 break;
775 msPendingInputReply = base.timestampMilli();
776 continue;
777
778 rc = self.sendMsg('STDINEOS');
779 oStdIn = None;
780 if rc is not True:
781 sFailure = 'sendMsg failure';
782 break;
783 msPendingInputReply = base.timestampMilli();
784
785 # Wait for input (500 ms timeout).
786 if cbMsg is None:
787 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
788 if cbMsg is None:
789 # Check for time out before restarting the loop.
790 # Note! Only doing timeout checking here does mean that
791 # the TXS may prevent us from timing out by
792 # flooding us with data. This is unlikely though.
793 if self.hasTimedOut() \
794 and ( msPendingInputReply is None \
795 or base.timestampMilli() - msPendingInputReply > 30000):
796 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
797 sFailure = 'timeout';
798 rc = None;
799 break;
800 # Check that the connection is OK.
801 if not self.oTransport.isConnectionOk():
802 self.oTransport.disconnect();
803 sFailure = 'disconnected';
804 rc = False;
805 break;
806 continue;
807
808 # Handle the response.
809 sOpcode = sOpcode.rstrip();
810 if sOpcode == 'STDOUT':
811 oOut = oStdOut;
812 elif sOpcode == 'STDERR':
813 oOut = oStdErr;
814 elif sOpcode == 'TESTPIPE':
815 oOut = oTestPipe;
816 else:
817 oOut = None;
818 if oOut is not None:
819 # Output from the process.
820 if len(abPayload) < 4:
821 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
822 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
823 rc = None;
824 break;
825 uStreamCrc32 = getU32(abPayload, 0);
826 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
827 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
828 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
829 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
830 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
831 rc = None;
832 break;
833 try:
834 oOut.write(abPayload[4:]);
835 except:
836 sFailure = 'exception writing %s' % (sOpcode);
837 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
838 rc = None;
839 break;
840 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
841 # Standard input is ignored. Ignore this condition for now.
842 msPendingInputReply = None;
843 reporter.log('taskExecEx: Standard input is ignored... why?');
844 del oStdIn.uTxsClientCrc32;
845 oStdIn = '/dev/null';
846 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',)\
847 and msPendingInputReply is not None:
848 # TXS STDIN error, abort.
849 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
850 msPendingInputReply = None;
851 sFailure = 'TXS is out of memory for std input buffering';
852 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
853 rc = None;
854 break;
855 elif sOpcode == 'ACK' and msPendingInputReply is not None:
856 msPendingInputReply = None;
857 elif sOpcode.startswith('PROC '):
858 # Process status message, handle it outside the loop.
859 rc = True;
860 break;
861 else:
862 sFailure = 'Unexpected opcode %s' % (sOpcode);
863 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
864 rc = None;
865 break;
866 # Clear the message.
867 cbMsg, sOpcode, abPayload = (None, None, None);
868
869 # If we sent an STDIN packet and didn't get a reply yet, we'll give
870 # TXS some 5 seconds to reply to this. If we don't wait here we'll
871 # get screwed later on if we mix it up with the reply to some other
872 # command. Hackish.
873 if msPendingInputReply is not None:
874 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
875 if cbMsg2 is not None:
876 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
877 % (cbMsg2, sOpcode2, abPayload2));
878 msPendingInputReply = None;
879 else:
880 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
881 self.fScrewedUpMsgState = True;
882
883 # Parse the exit status (True), abort (None) or do nothing (False).
884 if rc is True:
885 if sOpcode != 'PROC OK':
886 # Do proper parsing some other day if needed:
887 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
888 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
889 rc = False;
890 else:
891 if rc is None:
892 # Abort it.
893 reporter.log('taskExecEx: sending ABORT...');
894 rc = self.sendMsg('ABORT');
895 while rc is True:
896 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
897 if cbMsg is None:
898 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
899 self.fScrewedUpMsgState = True;
900 break;
901 if sOpcode.startswith('PROC '):
902 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
903 break;
904 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
905 # Check that the connection is OK before looping.
906 if not self.oTransport.isConnectionOk():
907 self.oTransport.disconnect();
908 break;
909
910 # Fake response with the reason why we quit.
911 if sFailure is not None:
912 self.t3oReply = (0, 'EXECFAIL', sFailure);
913 rc = None;
914 else:
915 rc = None;
916
917 # Cleanup.
918 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
919 if o is not None and not utils.isString(o):
920 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
921 # Make sure all files are closed
922 o.close(); # pylint: disable=maybe-no-member
923 reporter.log('taskExecEx: returns %s' % (rc));
924 return rc;
925
926 #
927 # Admin tasks
928 #
929
930 def hlpRebootShutdownWaitForAck(self, sCmd):
931 """Wait for reboot/shutodwn ACK."""
932 rc = self.recvAckLogged(sCmd);
933 if rc is True:
934 # poll a little while for server to disconnect.
935 uMsStart = base.timestampMilli();
936 while self.oTransport.isConnectionOk() \
937 and base.timestampMilli() - uMsStart >= 5000:
938 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
939 break;
940 self.oTransport.disconnect();
941 return rc;
942
943 def taskReboot(self):
944 rc = self.sendMsg('REBOOT');
945 if rc is True:
946 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
947 return rc;
948
949 def taskShutdown(self):
950 rc = self.sendMsg('SHUTDOWN');
951 if rc is True:
952 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
953 return rc;
954
955 #
956 # CD/DVD control tasks.
957 #
958
959 ## TODO
960
961 #
962 # File system tasks
963 #
964
965 def taskMkDir(self, sRemoteDir, fMode):
966 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
967 if rc is True:
968 rc = self.recvAckLogged('MKDIR');
969 return rc;
970
971 def taskMkDirPath(self, sRemoteDir, fMode):
972 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
973 if rc is True:
974 rc = self.recvAckLogged('MKDRPATH');
975 return rc;
976
977 def taskMkSymlink(self, sLinkTarget, sLink):
978 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
979 if rc is True:
980 rc = self.recvAckLogged('MKSYMLNK');
981 return rc;
982
983 def taskRmDir(self, sRemoteDir):
984 rc = self.sendMsg('RMDIR', (sRemoteDir,));
985 if rc is True:
986 rc = self.recvAckLogged('RMDIR');
987 return rc;
988
989 def taskRmFile(self, sRemoteFile):
990 rc = self.sendMsg('RMFILE', (sRemoteFile,));
991 if rc is True:
992 rc = self.recvAckLogged('RMFILE');
993 return rc;
994
995 def taskRmSymlink(self, sRemoteSymlink):
996 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
997 if rc is True:
998 rc = self.recvAckLogged('RMSYMLNK');
999 return rc;
1000
1001 def taskRmTree(self, sRemoteTree):
1002 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1003 if rc is True:
1004 rc = self.recvAckLogged('RMTREE');
1005 return rc;
1006
1007 #def "CHMOD "
1008 #def "CHOWN "
1009 #def "CHGRP "
1010
1011 def taskIsDir(self, sRemoteDir):
1012 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1013 if rc is True:
1014 rc = self.recvTrueFalse('ISDIR');
1015 return rc;
1016
1017 def taskIsFile(self, sRemoteFile):
1018 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1019 if rc is True:
1020 rc = self.recvTrueFalse('ISFILE');
1021 return rc;
1022
1023 def taskIsSymlink(self, sRemoteSymlink):
1024 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1025 if rc is True:
1026 rc = self.recvTrueFalse('ISSYMLNK');
1027 return rc;
1028
1029 #def "STAT "
1030 #def "LSTAT "
1031 #def "LIST "
1032
1033 def taskUploadFile(self, sLocalFile, sRemoteFile):
1034 #
1035 # Open the local file (make sure it exist before bothering TXS) and
1036 # tell TXS that we want to upload a file.
1037 #
1038 try:
1039 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1040 except:
1041 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1042 return False;
1043
1044 # Common cause with taskUploadStr
1045 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1046
1047 # Cleanup.
1048 oLocalFile.close();
1049 return rc;
1050
1051 def taskUploadString(self, sContent, sRemoteFile):
1052 # Wrap sContent in a file like class.
1053 class InStringFile(object): # pylint: disable=too-few-public-methods
1054 def __init__(self, sContent):
1055 self.sContent = sContent;
1056 self.off = 0;
1057
1058 def read(self, cbMax):
1059 cbLeft = len(self.sContent) - self.off;
1060 if cbLeft == 0:
1061 return "";
1062 if cbLeft <= cbMax:
1063 sRet = self.sContent[self.off:(self.off + cbLeft)];
1064 else:
1065 sRet = self.sContent[self.off:(self.off + cbMax)];
1066 self.off = self.off + len(sRet);
1067 return sRet;
1068
1069 oLocalString = InStringFile(sContent);
1070 return self.taskUploadCommon(oLocalString, sRemoteFile);
1071
1072 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1073 """Common worker used by taskUploadFile and taskUploadString."""
1074 # Command + ACK.
1075 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1076 if rc is True:
1077 rc = self.recvAckLogged('PUT FILE');
1078 if rc is True:
1079 #
1080 # Push data packets until eof.
1081 #
1082 uMyCrc32 = zlib.crc32(b'');
1083 while True:
1084 # Read up to 64 KB of data.
1085 try:
1086 sRaw = oLocalFile.read(65536);
1087 except:
1088 rc = None;
1089 break;
1090
1091 # Convert to array - this is silly!
1092 abBuf = array.array('B');
1093 if utils.isString(sRaw):
1094 for i, _ in enumerate(sRaw):
1095 abBuf.append(ord(sRaw[i]));
1096 else:
1097 abBuf.extend(sRaw);
1098 sRaw = None;
1099
1100 # Update the file stream CRC and send it off.
1101 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1102 if not abBuf:
1103 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1104 else:
1105 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1106 if rc is False:
1107 break;
1108
1109 # Wait for the reply.
1110 rc = self.recvAck();
1111 if rc is not True:
1112 if rc is False:
1113 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1114 else:
1115 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1116 rc = False;
1117 break;
1118
1119 # EOF?
1120 if not abBuf:
1121 break;
1122
1123 # Send ABORT on ACK and I/O errors.
1124 if rc is None:
1125 rc = self.sendMsg('ABORT');
1126 if rc is True:
1127 self.recvAckLogged('ABORT');
1128 rc = False;
1129 return rc;
1130
1131 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1132 try:
1133 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1134 except:
1135 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1136 return False;
1137
1138 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1139
1140 oLocalFile.close();
1141 if rc is False:
1142 try:
1143 os.remove(sLocalFile);
1144 except:
1145 reporter.errorXcpt();
1146 return rc;
1147
1148 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1149 # Wrap sContent in a file like class.
1150 class OutStringFile(object): # pylint: disable=too-few-public-methods
1151 def __init__(self):
1152 self.asContent = [];
1153
1154 def write(self, sBuf):
1155 self.asContent.append(sBuf);
1156 return None;
1157
1158 oLocalString = OutStringFile();
1159 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1160 if rc is True:
1161 rc = '';
1162 for sBuf in oLocalString.asContent:
1163 if hasattr(sBuf, 'decode'):
1164 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1165 else:
1166 rc += sBuf;
1167 return rc;
1168
1169 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1170 """Common worker for taskDownloadFile and taskDownloadString."""
1171 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1172 if rc is True:
1173 #
1174 # Process data packets until eof.
1175 #
1176 uMyCrc32 = zlib.crc32(b'');
1177 while rc is True:
1178 cbMsg, sOpcode, abPayload = self.recvReply();
1179 if cbMsg is None:
1180 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1181 rc = None;
1182 break;
1183
1184 # Validate.
1185 sOpcode = sOpcode.rstrip();
1186 if sOpcode not in ('DATA', 'DATA EOF',):
1187 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1188 % (sOpcode, getSZ(abPayload, 0, "None")));
1189 rc = False;
1190 break;
1191 if sOpcode == 'DATA' and len(abPayload) < 4:
1192 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1193 rc = None;
1194 break;
1195 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1196 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1197 rc = None;
1198 break;
1199
1200 # Check the CRC (common for both packets).
1201 uCrc32 = getU32(abPayload, 0);
1202 if sOpcode == 'DATA':
1203 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1204 if uCrc32 != (uMyCrc32 & 0xffffffff):
1205 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1206 % (hex(uMyCrc32), hex(uCrc32)));
1207 rc = None;
1208 break;
1209 if sOpcode == 'DATA EOF':
1210 rc = self.sendMsg('ACK');
1211 break;
1212
1213 # Finally, push the data to the file.
1214 try:
1215 oLocalFile.write(abPayload[4:].tostring());
1216 except:
1217 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1218 rc = None;
1219 break;
1220 rc = self.sendMsg('ACK');
1221
1222 # Send NACK on validation and I/O errors.
1223 if rc is None:
1224 rc = self.sendMsg('NACK');
1225 rc = False;
1226 return rc;
1227
1228 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1229 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1230 if rc is True:
1231 rc = self.recvAckLogged('UNPKFILE');
1232 return rc;
1233
1234 # pylint: enable=missing-docstring
1235
1236
1237 #
1238 # Public methods - generic task queries
1239 #
1240
1241 def isSuccess(self):
1242 """Returns True if the task completed successfully, otherwise False."""
1243 self.lockTask();
1244 sStatus = self.sStatus;
1245 oTaskRc = self.oTaskRc;
1246 self.unlockTask();
1247 if sStatus != "":
1248 return False;
1249 if oTaskRc is False or oTaskRc is None:
1250 return False;
1251 return True;
1252
1253 def getResult(self):
1254 """
1255 Returns the result of a completed task.
1256 Returns None if not completed yet or no previous task.
1257 """
1258 self.lockTask();
1259 sStatus = self.sStatus;
1260 oTaskRc = self.oTaskRc;
1261 self.unlockTask();
1262 if sStatus != "":
1263 return None;
1264 return oTaskRc;
1265
1266 def getLastReply(self):
1267 """
1268 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1269 Returns a None, None, None three-tuple if there was no last reply.
1270 """
1271 self.lockTask();
1272 t3oReply = self.t3oReply;
1273 self.unlockTask();
1274 return t3oReply;
1275
1276 #
1277 # Public methods - connection.
1278 #
1279
1280 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1281 """
1282 Initiates a disconnect task.
1283
1284 Returns True on success, False on failure (logged).
1285
1286 The task returns True on success and False on failure.
1287 """
1288 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1289
1290 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1291 """Synchronous version."""
1292 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1293
1294 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1295 """
1296 Initiates a task for getting the TXS UUID.
1297
1298 Returns True on success, False on failure (logged).
1299
1300 The task returns UUID string (in {}) on success and False on failure.
1301 """
1302 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1303
1304 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1305 """Synchronous version."""
1306 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1307
1308 #
1309 # Public methods - execution.
1310 #
1311
1312 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1313 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1314 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1315 """
1316 Initiates a exec process task.
1317
1318 Returns True on success, False on failure (logged).
1319
1320 The task returns True if the process exited normally with status code 0.
1321 The task returns None if on failure prior to executing the process, and
1322 False if the process exited with a different status or in an abnormal
1323 manner. Both None and False are logged of course and further info can
1324 also be obtained by getLastReply().
1325
1326 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1327 these streams. If None, no special action is taken and the output goes
1328 to where ever the TXS sends its output, and ditto for input.
1329 - To send to / read from the bitbucket, pass '/dev/null'.
1330 - To redirect to/from a file, just specify the remote filename.
1331 - To append to a file use '>>' followed by the remote filename.
1332 - To pipe the stream to/from the TXS, specify a file like
1333 object. For StdIn a non-blocking read() method is required. For
1334 the other a write() method is required. Watch out for deadlock
1335 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1336 """
1337 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1338 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1339 oStdOut, oStdErr, oTestPipe, sAsUser));
1340
1341 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1342 oStdIn = '/dev/null', oStdOut = '/dev/null',
1343 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1344 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1345 """Synchronous version."""
1346 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1347 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1348
1349 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1350 cMsTimeout = 3600000, fIgnoreErrors = False):
1351 """
1352 Initiates a exec process test task.
1353
1354 Returns True on success, False on failure (logged).
1355
1356 The task returns True if the process exited normally with status code 0.
1357 The task returns None if on failure prior to executing the process, and
1358 False if the process exited with a different status or in an abnormal
1359 manner. Both None and False are logged of course and further info can
1360 also be obtained by getLastReply().
1361
1362 Standard in is taken from /dev/null. While both standard output and
1363 standard error goes directly to reporter.log(). The testpipe is piped
1364 to reporter.xxxx.
1365 """
1366
1367 sStdIn = '/dev/null';
1368 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1369 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1370 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1371 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1372
1373 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1374 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1375
1376 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1377 cMsTimeout = 3600000, fIgnoreErrors = False):
1378 """Synchronous version."""
1379 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1380 cMsTimeout, fIgnoreErrors);
1381
1382 #
1383 # Public methods - system
1384 #
1385
1386 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1387 """
1388 Initiates a reboot task.
1389
1390 Returns True on success, False on failure (logged).
1391
1392 The task returns True on success, False on failure (logged). The
1393 session will be disconnected on successful task completion.
1394 """
1395 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1396
1397 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1398 """Synchronous version."""
1399 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1400
1401 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1402 """
1403 Initiates a shutdown task.
1404
1405 Returns True on success, False on failure (logged).
1406
1407 The task returns True on success, False on failure (logged).
1408 """
1409 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1410
1411 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1412 """Synchronous version."""
1413 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1414
1415
1416 #
1417 # Public methods - file system
1418 #
1419
1420 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1421 """
1422 Initiates a mkdir task.
1423
1424 Returns True on success, False on failure (logged).
1425
1426 The task returns True on success, False on failure (logged).
1427 """
1428 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1429
1430 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1431 """Synchronous version."""
1432 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1433
1434 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1435 """
1436 Initiates a mkdir -p task.
1437
1438 Returns True on success, False on failure (logged).
1439
1440 The task returns True on success, False on failure (logged).
1441 """
1442 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1443
1444 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1445 """Synchronous version."""
1446 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1447
1448 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1449 """
1450 Initiates a symlink task.
1451
1452 Returns True on success, False on failure (logged).
1453
1454 The task returns True on success, False on failure (logged).
1455 """
1456 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1457
1458 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1459 """Synchronous version."""
1460 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1461
1462 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1463 """
1464 Initiates a rmdir task.
1465
1466 Returns True on success, False on failure (logged).
1467
1468 The task returns True on success, False on failure (logged).
1469 """
1470 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1471
1472 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1473 """Synchronous version."""
1474 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1475
1476 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1477 """
1478 Initiates a rmfile task.
1479
1480 Returns True on success, False on failure (logged).
1481
1482 The task returns True on success, False on failure (logged).
1483 """
1484 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1485
1486 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1487 """Synchronous version."""
1488 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1489
1490 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1491 """
1492 Initiates a rmsymlink task.
1493
1494 Returns True on success, False on failure (logged).
1495
1496 The task returns True on success, False on failure (logged).
1497 """
1498 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1499
1500 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1501 """Synchronous version."""
1502 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1503
1504 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1505 """
1506 Initiates a rmtree task.
1507
1508 Returns True on success, False on failure (logged).
1509
1510 The task returns True on success, False on failure (logged).
1511 """
1512 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1513
1514 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1515 """Synchronous version."""
1516 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1517
1518 #def "CHMOD "
1519 #def "CHOWN "
1520 #def "CHGRP "
1521
1522 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1523 """
1524 Initiates a is-dir query task.
1525
1526 Returns True on success, False on failure (logged).
1527
1528 The task returns True if it's a directory, False if it isn't, and
1529 None on error (logged).
1530 """
1531 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1532
1533 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1534 """Synchronous version."""
1535 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1536
1537 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1538 """
1539 Initiates a is-file query task.
1540
1541 Returns True on success, False on failure (logged).
1542
1543 The task returns True if it's a file, False if it isn't, and None on
1544 error (logged).
1545 """
1546 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1547
1548 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1549 """Synchronous version."""
1550 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1551
1552 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1553 """
1554 Initiates a is-symbolic-link query task.
1555
1556 Returns True on success, False on failure (logged).
1557
1558 The task returns True if it's a symbolic linke, False if it isn't, and
1559 None on error (logged).
1560 """
1561 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1562
1563 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """Synchronous version."""
1565 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1566
1567 #def "STAT "
1568 #def "LSTAT "
1569 #def "LIST "
1570
1571 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1572 """
1573 Initiates a download query task.
1574
1575 Returns True on success, False on failure (logged).
1576
1577 The task returns True on success, False on failure (logged).
1578 """
1579 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1580
1581 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1582 """Synchronous version."""
1583 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1584
1585 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1586 """
1587 Initiates a upload string task.
1588
1589 Returns True on success, False on failure (logged).
1590
1591 The task returns True on success, False on failure (logged).
1592 """
1593 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1594
1595 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1596 """Synchronous version."""
1597 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1598
1599 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1600 """
1601 Initiates a download file task.
1602
1603 Returns True on success, False on failure (logged).
1604
1605 The task returns True on success, False on failure (logged).
1606 """
1607 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1608
1609 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1610 """Synchronous version."""
1611 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1612
1613 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1614 cMsTimeout = 30000, fIgnoreErrors = False):
1615 """
1616 Initiates a download string task.
1617
1618 Returns True on success, False on failure (logged).
1619
1620 The task returns a byte string on success, False on failure (logged).
1621 """
1622 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1623 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1624
1625 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1626 cMsTimeout = 30000, fIgnoreErrors = False):
1627 """Synchronous version."""
1628 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1629 cMsTimeout, fIgnoreErrors);
1630
1631 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1632 """
1633 Initiates a unpack file task.
1634
1635 Returns True on success, False on failure (logged).
1636
1637 The task returns True on success, False on failure (logged).
1638 """
1639 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1640 (sRemoteFile, sRemoteDir));
1641
1642 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1643 """Synchronous version."""
1644 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1645
1646
1647class TransportTcp(TransportBase):
1648 """
1649 TCP transport layer for the TXS client session class.
1650 """
1651
1652 def __init__(self, sHostname, uPort, fReversedSetup):
1653 """
1654 Save the parameters. The session will call us back to make the
1655 connection later on its worker thread.
1656 """
1657 TransportBase.__init__(self, utils.getCallerName());
1658 self.sHostname = sHostname;
1659 self.fReversedSetup = fReversedSetup;
1660 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1661 self.oSocket = None;
1662 self.oWakeupW = None;
1663 self.oWakeupR = None;
1664 self.fConnectCanceled = False;
1665 self.fIsConnecting = False;
1666 self.oCv = threading.Condition();
1667 self.abReadAhead = array.array('B');
1668
1669 def toString(self):
1670 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1671 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1672 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1673 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1674
1675 def __isInProgressXcpt(self, oXcpt):
1676 """ In progress exception? """
1677 try:
1678 if isinstance(oXcpt, socket.error):
1679 try:
1680 if oXcpt.errno == errno.EINPROGRESS:
1681 return True;
1682 except: pass;
1683 # Windows?
1684 try:
1685 if oXcpt.errno == errno.EWOULDBLOCK:
1686 return True;
1687 except: pass;
1688 except:
1689 pass;
1690 return False;
1691
1692 def __isWouldBlockXcpt(self, oXcpt):
1693 """ Would block exception? """
1694 try:
1695 if isinstance(oXcpt, socket.error):
1696 try:
1697 if oXcpt.errno == errno.EWOULDBLOCK:
1698 return True;
1699 except: pass;
1700 try:
1701 if oXcpt.errno == errno.EAGAIN:
1702 return True;
1703 except: pass;
1704 except:
1705 pass;
1706 return False;
1707
1708 def __isConnectionReset(self, oXcpt):
1709 """ Connection reset by Peer or others. """
1710 try:
1711 if isinstance(oXcpt, socket.error):
1712 try:
1713 if oXcpt.errno == errno.ECONNRESET:
1714 return True;
1715 except: pass;
1716 try:
1717 if oXcpt.errno == errno.ENETRESET:
1718 return True;
1719 except: pass;
1720 except:
1721 pass;
1722 return False;
1723
1724 def _closeWakeupSockets(self):
1725 """ Closes the wakup sockets. Caller should own the CV. """
1726 oWakeupR = self.oWakeupR;
1727 self.oWakeupR = None;
1728 if oWakeupR is not None:
1729 oWakeupR.close();
1730
1731 oWakeupW = self.oWakeupW;
1732 self.oWakeupW = None;
1733 if oWakeupW is not None:
1734 oWakeupW.close();
1735
1736 return None;
1737
1738 def cancelConnect(self):
1739 # This is bad stuff.
1740 self.oCv.acquire();
1741 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1742 self.fConnectCanceled = True;
1743 if self.fIsConnecting:
1744 oSocket = self.oSocket;
1745 self.oSocket = None;
1746 if oSocket is not None:
1747 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1748 oSocket.close();
1749
1750 oWakeupW = self.oWakeupW;
1751 self.oWakeupW = None;
1752 if oWakeupW is not None:
1753 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1754 try: oWakeupW.send('cancelled!\n');
1755 except: reporter.logXcpt();
1756 try: oWakeupW.shutdown(socket.SHUT_WR);
1757 except: reporter.logXcpt();
1758 oWakeupW.close();
1759 self.oCv.release();
1760
1761 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1762 """ Connects to the TXS server as server, i.e. the reversed setup. """
1763 assert(self.fReversedSetup);
1764
1765 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1766
1767 # Workaround for bind() failure...
1768 try:
1769 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1770 except:
1771 reporter.errorXcpt('socket.listen(1) failed');
1772 return None;
1773
1774 # Bind the socket and make it listen.
1775 try:
1776 oSocket.bind((self.sHostname, self.uPort));
1777 except:
1778 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1779 return None;
1780 try:
1781 oSocket.listen(1);
1782 except:
1783 reporter.errorXcpt('socket.listen(1) failed');
1784 return None;
1785
1786 # Accept connections.
1787 oClientSocket = None;
1788 tClientAddr = None;
1789 try:
1790 (oClientSocket, tClientAddr) = oSocket.accept();
1791 except socket.error as e:
1792 if not self.__isInProgressXcpt(e):
1793 raise;
1794
1795 # Do the actual waiting.
1796 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1797 try:
1798 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1799 except socket.error as e:
1800 if e[0] != errno.EBADF or not self.fConnectCanceled:
1801 raise;
1802 reporter.log('socket.select() on accept was canceled');
1803 return None;
1804 except:
1805 reporter.logXcpt('socket.select() on accept');
1806
1807 # Try accept again.
1808 try:
1809 (oClientSocket, tClientAddr) = oSocket.accept();
1810 except socket.error as e:
1811 if not self.__isInProgressXcpt(e):
1812 if e[0] != errno.EBADF or not self.fConnectCanceled:
1813 raise;
1814 reporter.log('socket.accept() was canceled');
1815 return None;
1816 reporter.log('socket.accept() timed out');
1817 return False;
1818 except:
1819 reporter.errorXcpt('socket.accept() failed');
1820 return None;
1821 except:
1822 reporter.errorXcpt('socket.accept() failed');
1823 return None;
1824
1825 # Store the connected socket and throw away the server socket.
1826 self.oCv.acquire();
1827 if not self.fConnectCanceled:
1828 self.oSocket.close();
1829 self.oSocket = oClientSocket;
1830 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1831 self.oCv.release();
1832 return True;
1833
1834 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1835 """ Connects to the TXS server as client. """
1836 assert(not self.fReversedSetup);
1837
1838 # Connect w/ timeouts.
1839 rc = None;
1840 try:
1841 oSocket.connect((self.sHostname, self.uPort));
1842 rc = True;
1843 except socket.error as oXcpt:
1844 iRc = oXcpt.errno;
1845 if self.__isInProgressXcpt(oXcpt):
1846 # Do the actual waiting.
1847 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
1848 try:
1849 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1850 if len(ttRc[1]) + len(ttRc[2]) == 0:
1851 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1852 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1853 rc = iRc == 0;
1854 except socket.error as oXcpt2:
1855 iRc = oXcpt2.errno;
1856 except:
1857 iRc = -42;
1858 reporter.fatalXcpt('socket.select() on connect failed');
1859
1860 if rc is True:
1861 pass;
1862 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
1863 rc = False; # try again.
1864 else:
1865 if iRc != errno.EBADF or not self.fConnectCanceled:
1866 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1867 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1868 except:
1869 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1870 return rc;
1871
1872
1873 def connect(self, cMsTimeout):
1874 # Create a non-blocking socket.
1875 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1876 try:
1877 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1878 except:
1879 reporter.fatalXcpt('socket.socket() failed');
1880 return None;
1881 try:
1882 oSocket.setblocking(0);
1883 except:
1884 oSocket.close();
1885 reporter.fatalXcpt('socket.socket() failed');
1886 return None;
1887
1888 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1889 oWakeupR = None;
1890 oWakeupW = None;
1891 if hasattr(socket, 'socketpair'):
1892 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
1893 except: reporter.logXcpt('socket.socketpair() failed');
1894
1895 # Update the state.
1896 self.oCv.acquire();
1897 rc = None;
1898 if not self.fConnectCanceled:
1899 self.oSocket = oSocket;
1900 self.oWakeupW = oWakeupW;
1901 self.oWakeupR = oWakeupR;
1902 self.fIsConnecting = True;
1903 self.oCv.release();
1904
1905 # Try connect.
1906 if oWakeupR is None:
1907 oWakeupR = oSocket; # Avoid select failure.
1908 if self.fReversedSetup:
1909 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1910 else:
1911 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1912 oSocket = None;
1913
1914 # Update the state and cleanup on failure/cancel.
1915 self.oCv.acquire();
1916 if rc is True and self.fConnectCanceled:
1917 rc = False;
1918 self.fIsConnecting = False;
1919
1920 if rc is not True:
1921 if self.oSocket is not None:
1922 self.oSocket.close();
1923 self.oSocket = None;
1924 self._closeWakeupSockets();
1925 self.oCv.release();
1926
1927 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1928 return rc;
1929
1930 def disconnect(self, fQuiet = False):
1931 if self.oSocket is not None:
1932 self.abReadAhead = array.array('B');
1933
1934 # Try a shutting down the socket gracefully (draining it).
1935 try:
1936 self.oSocket.shutdown(socket.SHUT_WR);
1937 except:
1938 if not fQuiet:
1939 reporter.error('shutdown(SHUT_WR)');
1940 try:
1941 self.oSocket.setblocking(0); # just in case it's not set.
1942 sData = "1";
1943 while sData:
1944 sData = self.oSocket.recv(16384);
1945 except:
1946 pass;
1947
1948 # Close it.
1949 self.oCv.acquire();
1950 try: self.oSocket.setblocking(1);
1951 except: pass;
1952 self.oSocket.close();
1953 self.oSocket = None;
1954 else:
1955 self.oCv.acquire();
1956 self._closeWakeupSockets();
1957 self.oCv.release();
1958
1959 def sendBytes(self, abBuf, cMsTimeout):
1960 if self.oSocket is None:
1961 reporter.error('TransportTcp.sendBytes: No connection.');
1962 return False;
1963
1964 # Try send it all.
1965 try:
1966 cbSent = self.oSocket.send(abBuf);
1967 if cbSent == len(abBuf):
1968 return True;
1969 except Exception as oXcpt:
1970 if not self.__isWouldBlockXcpt(oXcpt):
1971 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
1972 return False;
1973 cbSent = 0;
1974
1975 # Do a timed send.
1976 msStart = base.timestampMilli();
1977 while True:
1978 cMsElapsed = base.timestampMilli() - msStart;
1979 if cMsElapsed > cMsTimeout:
1980 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
1981 break;
1982
1983 # wait.
1984 try:
1985 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1986 if ttRc[2] and not ttRc[1]:
1987 reporter.error('TranportTcp.sendBytes: select returned with exception');
1988 break;
1989 if not ttRc[1]:
1990 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
1991 break;
1992 except:
1993 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1994 break;
1995
1996 # Try send more.
1997 try:
1998 cbSent += self.oSocket.send(abBuf[cbSent:]);
1999 if cbSent == len(abBuf):
2000 return True;
2001 except Exception as oXcpt:
2002 if not self.__isWouldBlockXcpt(oXcpt):
2003 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2004 break;
2005
2006 return False;
2007
2008 def __returnReadAheadBytes(self, cb):
2009 """ Internal worker for recvBytes. """
2010 assert(len(self.abReadAhead) >= cb);
2011 abRet = self.abReadAhead[:cb];
2012 self.abReadAhead = self.abReadAhead[cb:];
2013 return abRet;
2014
2015 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2016 if self.oSocket is None:
2017 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2018 return None;
2019
2020 # Try read in some more data without bothering with timeout handling first.
2021 if len(self.abReadAhead) < cb:
2022 try:
2023 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2024 if abBuf:
2025 self.abReadAhead.extend(array.array('B', abBuf));
2026 except Exception as oXcpt:
2027 if not self.__isWouldBlockXcpt(oXcpt):
2028 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2029 return None;
2030
2031 if len(self.abReadAhead) >= cb:
2032 return self.__returnReadAheadBytes(cb);
2033
2034 # Timeout loop.
2035 msStart = base.timestampMilli();
2036 while True:
2037 cMsElapsed = base.timestampMilli() - msStart;
2038 if cMsElapsed > cMsTimeout:
2039 if not fNoDataOk or self.abReadAhead:
2040 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2041 break;
2042
2043 # Wait.
2044 try:
2045 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2046 if ttRc[2] and not ttRc[0]:
2047 reporter.error('TranportTcp.recvBytes: select returned with exception');
2048 break;
2049 if not ttRc[0]:
2050 if not fNoDataOk or self.abReadAhead:
2051 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2052 % (len(self.abReadAhead), cb, fNoDataOk));
2053 break;
2054 except:
2055 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2056 break;
2057
2058 # Try read more.
2059 try:
2060 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2061 if not abBuf:
2062 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2063 % (len(self.abReadAhead), cb, fNoDataOk));
2064 self.disconnect();
2065 return None;
2066
2067 self.abReadAhead.extend(array.array('B', abBuf));
2068
2069 except Exception as oXcpt:
2070 reporter.log('recv => exception %s' % (oXcpt,));
2071 if not self.__isWouldBlockXcpt(oXcpt):
2072 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2073 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2074 break;
2075
2076 # Done?
2077 if len(self.abReadAhead) >= cb:
2078 return self.__returnReadAheadBytes(cb);
2079
2080 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2081 return None;
2082
2083 def isConnectionOk(self):
2084 if self.oSocket is None:
2085 return False;
2086 try:
2087 ttRc = select.select([], [], [self.oSocket], 0.0);
2088 if ttRc[2]:
2089 return False;
2090
2091 self.oSocket.send(array.array('B')); # send zero bytes.
2092 except:
2093 return False;
2094 return True;
2095
2096 def isRecvPending(self, cMsTimeout = 0):
2097 try:
2098 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2099 if not ttRc[0]:
2100 return False;
2101 except:
2102 pass;
2103 return True;
2104
2105
2106def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2107 """
2108 Opens a connection to a Test Execution Service via TCP, given its name.
2109 """
2110 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2111 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2112 try:
2113 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2114 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2115 except:
2116 reporter.errorXcpt(None, 15);
2117 return None;
2118 return oSession;
2119
2120
2121def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2122 """
2123 Tries to open a connection to a Test Execution Service via TCP, given its name.
2124
2125 This differs from openTcpSession in that it won't log a connection failure
2126 as an error.
2127 """
2128 try:
2129 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2130 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2131 except:
2132 reporter.errorXcpt(None, 15);
2133 return None;
2134 return oSession;
2135
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette