VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 60851

Last change on this file since 60851 was 60851, checked in by vboxsync, 9 years ago

ValidationKit: Make sure we close all files when Session.taskExec() returns. Fixes submitting data form the test pipe to the testmanager

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 78.8 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 60851 2016-05-05 17:11:08Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2015 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 60851 $"
30
31# Standard Python imports.
32import array
33import errno
34import os
35import select
36import socket
37import threading
38import time
39import types
40import zlib
41import uuid
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49#
50# Helpers for decoding data received from the TXS.
51# These are used both the Session and Transport classes.
52#
53
54def getU32(abData, off):
55 """Get a U32 field."""
56 return abData[off] \
57 + abData[off + 1] * 256 \
58 + abData[off + 2] * 65536 \
59 + abData[off + 3] * 16777216;
60
61def getSZ(abData, off, sDefault = None):
62 """
63 Get a zero-terminated string field.
64 Returns sDefault if the string is invalid.
65 """
66 cchStr = getSZLen(abData, off);
67 if cchStr >= 0:
68 abStr = abData[off:(off + cchStr)];
69 try:
70 return abStr.tostring().decode('utf_8');
71 except:
72 reporter.errorXcpt('getSZ(,%u)' % (off));
73 return sDefault;
74
75def getSZLen(abData, off):
76 """
77 Get the length of a zero-terminated string field, in bytes.
78 Returns -1 if off is beyond the data packet or not properly terminated.
79 """
80 cbData = len(abData);
81 if off >= cbData:
82 return -1;
83
84 offCur = off;
85 while abData[offCur] != 0:
86 offCur = offCur + 1;
87 if offCur >= cbData:
88 return -1;
89
90 return offCur - off;
91
92def isValidOpcodeEncoding(sOpcode):
93 """
94 Checks if the specified opcode is valid or not.
95 Returns True on success.
96 Returns False if it is invalid, details in the log.
97 """
98 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
100 if len(sOpcode) != 8:
101 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
102 return False;
103 for i in range(0, 1):
104 if sSet1.find(sOpcode[i]) < 0:
105 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
106 return False;
107 for i in range(2, 7):
108 if sSet2.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 return True;
112
113#
114# Helper for encoding data sent to the TXS.
115#
116
117def u32ToByteArray(u32):
118 """Encodes the u32 value as a little endian byte (B) array."""
119 return array.array('B', \
120 ( u32 % 256, \
121 (u32 / 256) % 256, \
122 (u32 / 65536) % 256, \
123 (u32 / 16777216) % 256) );
124
125
126
127class TransportBase(object):
128 """
129 Base class for the transport layer.
130 """
131
132 def __init__(self, sCaller):
133 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
134 self.fDummy = 0;
135 self.abReadAheadHdr = array.array('B');
136
137 def toString(self):
138 """
139 Stringify the instance for logging and debugging.
140 """
141 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
142
143 def __str__(self):
144 return self.toString();
145
146 def cancelConnect(self):
147 """
148 Cancels any pending connect() call.
149 Returns None;
150 """
151 return None;
152
153 def connect(self, cMsTimeout):
154 """
155 Quietly attempts to connect to the TXS.
156
157 Returns True on success.
158 Returns False on retryable errors (no logging).
159 Returns None on fatal errors with details in the log.
160
161 Override this method, don't call super.
162 """
163 _ = cMsTimeout;
164 return False;
165
166 def disconnect(self, fQuiet = False):
167 """
168 Disconnect from the TXS.
169
170 Returns True.
171
172 Override this method, don't call super.
173 """
174 _ = fQuiet;
175 return True;
176
177 def sendBytes(self, abBuf, cMsTimeout):
178 """
179 Sends the bytes in the buffer abBuf to the TXS.
180
181 Returns True on success.
182 Returns False on failure and error details in the log.
183
184 Override this method, don't call super.
185
186 Remarks: len(abBuf) is always a multiple of 16.
187 """
188 _ = abBuf; _ = cMsTimeout;
189 return False;
190
191 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
192 """
193 Receive cb number of bytes from the TXS.
194
195 Returns the bytes (array('B')) on success.
196 Returns None on failure and error details in the log.
197
198 Override this method, don't call super.
199
200 Remarks: cb is always a multiple of 16.
201 """
202 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
203 return False;
204
205 def isConnectionOk(self):
206 """
207 Checks if the connection is OK.
208
209 Returns True if it is.
210 Returns False if it isn't (caller should call diconnect).
211
212 Override this method, don't call super.
213 """
214 return True;
215
216 def isRecvPending(self, cMsTimeout = 0):
217 """
218 Checks if there is incoming bytes, optionally waiting cMsTimeout
219 milliseconds for something to arrive.
220
221 Returns True if there is, False if there isn't.
222
223 Override this method, don't call super.
224 """
225 _ = cMsTimeout;
226 return False;
227
228 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
229 """
230 Sends a message (opcode + encoded payload).
231
232 Returns True on success.
233 Returns False on failure and error details in the log.
234 """
235 # Fix + check the opcode.
236 if len(sOpcode) < 2:
237 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
238 return False;
239 sOpcode = sOpcode.ljust(8);
240 if not isValidOpcodeEncoding(sOpcode):
241 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
242 return False;
243
244 # Start construct the message.
245 cbMsg = 16 + len(abPayload);
246 abMsg = array.array('B');
247 abMsg.extend(u32ToByteArray(cbMsg));
248 abMsg.extend((0, 0, 0, 0)); # uCrc32
249 try:
250 abMsg.extend(array.array('B', \
251 ( ord(sOpcode[0]), \
252 ord(sOpcode[1]), \
253 ord(sOpcode[2]), \
254 ord(sOpcode[3]), \
255 ord(sOpcode[4]), \
256 ord(sOpcode[5]), \
257 ord(sOpcode[6]), \
258 ord(sOpcode[7]) ) ) );
259 if len(abPayload) > 0:
260 abMsg.extend(abPayload);
261 except:
262 reporter.fatalXcpt('sendMsgInt: packing problem...');
263 return False;
264
265 # checksum it, padd it and send it off.
266 uCrc32 = zlib.crc32(abMsg[8:]);
267 abMsg[4:8] = u32ToByteArray(uCrc32);
268
269 while len(abMsg) % 16:
270 abMsg.append(0);
271
272 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
273 return self.sendBytes(abMsg, cMsTimeout);
274
275 def recvMsg(self, cMsTimeout, fNoDataOk = False):
276 """
277 Receives a message from the TXS.
278
279 Returns the message three-tuple: length, opcode, payload.
280 Returns (None, None, None) on failure and error details in the log.
281 """
282
283 # Read the header.
284 if len(self.abReadAheadHdr) > 0:
285 assert(len(self.abReadAheadHdr) == 16);
286 abHdr = self.abReadAheadHdr;
287 self.abReadAheadHdr = array.array('B');
288 else:
289 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
290 if abHdr is None:
291 return (None, None, None);
292 if len(abHdr) != 16:
293 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
294 return (None, None, None);
295
296 # Unpack and validate the header.
297 cbMsg = getU32(abHdr, 0);
298 uCrc32 = getU32(abHdr, 4);
299 sOpcode = abHdr[8:16].tostring().decode('ascii');
300
301 if cbMsg < 16:
302 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
303 return (None, None, None);
304 if cbMsg > 1024*1024:
305 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
306 return (None, None, None);
307 if not isValidOpcodeEncoding(sOpcode):
308 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
309 return (None, None, None);
310
311 # Get the payload (if any), dropping the padding.
312 abPayload = array.array('B');
313 if cbMsg > 16:
314 if cbMsg % 16:
315 cbPadding = 16 - (cbMsg % 16);
316 else:
317 cbPadding = 0;
318 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
319 if abPayload is None:
320 self.abReadAheadHdr = abHdr;
321 if not fNoDataOk :
322 reporter.log('recvMsg: failed to recv payload bytes!');
323 return (None, None, None);
324
325 while cbPadding > 0:
326 abPayload.pop();
327 cbPadding = cbPadding - 1;
328
329 # Check the CRC-32.
330 if uCrc32 != 0:
331 uActualCrc32 = zlib.crc32(abHdr[8:]);
332 if cbMsg > 16:
333 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
334 uActualCrc32 = uActualCrc32 & 0xffffffff;
335 if uCrc32 != uActualCrc32:
336 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
337 return (None, None, None);
338
339 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
340 return (cbMsg, sOpcode, abPayload);
341
342 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
343 """
344 Sends a message (opcode + payload tuple).
345
346 Returns True on success.
347 Returns False on failure and error details in the log.
348 Returns None if you pass the incorrectly typed parameters.
349 """
350 # Encode the payload.
351 abPayload = array.array('B');
352 for o in aoPayload:
353 try:
354 if isinstance(o, basestring):
355 # the primitive approach...
356 sUtf8 = o.encode('utf_8');
357 for i in range(0, len(sUtf8)):
358 abPayload.append(ord(sUtf8[i]))
359 abPayload.append(0);
360 elif isinstance(o, types.LongType):
361 if o < 0 or o > 0xffffffff:
362 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
363 return None;
364 abPayload.extend(u32ToByteArray(o));
365 elif isinstance(o, array.array):
366 abPayload.extend(o);
367 else:
368 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
369 return None;
370 except:
371 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
372 return None;
373 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
374
375
376class Session(TdTaskBase):
377 """
378 A Test eXecution Service (TXS) client session.
379 """
380
381 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
382 """
383 Construct a TXS session.
384
385 This starts by connecting to the TXS and will enter the signalled state
386 when connected or the timeout has been reached.
387 """
388 TdTaskBase.__init__(self, utils.getCallerName());
389 self.oTransport = oTransport;
390 self.sStatus = "";
391 self.cMsTimeout = 0;
392 self.fErr = True; # Whether to report errors as error.
393 self.msStart = 0;
394 self.oThread = None;
395 self.fnTask = self.taskDummy;
396 self.aTaskArgs = None;
397 self.oTaskRc = None;
398 self.t3oReply = (None, None, None);
399 self.fScrewedUpMsgState = False;
400 self.fTryConnect = fTryConnect;
401
402 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
403 raise base.GenError("startTask failed");
404
405 def __del__(self):
406 """Make sure to cancel the task when deleted."""
407 self.cancelTask();
408
409 def toString(self):
410 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
411 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
412 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
413 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
414
415 def taskDummy(self):
416 """Place holder to catch broken state handling."""
417 raise Exception();
418
419 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
420 """
421 Kicks of a new task.
422
423 cMsTimeout: The task timeout in milliseconds. Values less than
424 500 ms will be adjusted to 500 ms. This means it is
425 OK to use negative value.
426 sStatus: The task status.
427 fnTask: The method that'll execute the task.
428 aArgs: Arguments to pass to fnTask.
429
430 Returns True on success, False + error in log on failure.
431 """
432 if not self.cancelTask():
433 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
434 return False;
435
436 # Change status and make sure we're the
437 self.lockTask();
438 if self.sStatus != "":
439 self.unlockTask();
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
441 return False;
442 self.sStatus = "setup";
443 self.oTaskRc = None;
444 self.t3oReply = (None, None, None);
445 self.resetTaskLocked();
446 self.unlockTask();
447
448 self.cMsTimeout = max(cMsTimeout, 500);
449 self.fErr = not fIgnoreErrors;
450 self.fnTask = fnTask;
451 self.aTaskArgs = aArgs;
452 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
453 self.oThread.setDaemon(True);
454 self.msStart = base.timestampMilli();
455
456 self.lockTask();
457 self.sStatus = sStatus;
458 self.unlockTask();
459 self.oThread.start();
460
461 return True;
462
463 def cancelTask(self, fSync = True):
464 """
465 Attempts to cancel any pending tasks.
466 Returns success indicator (True/False).
467 """
468 self.lockTask();
469
470 if self.sStatus == "":
471 self.unlockTask();
472 return True;
473 if self.sStatus == "setup":
474 self.unlockTask();
475 return False;
476 if self.sStatus == "cancelled":
477 self.unlockTask();
478 return False;
479
480 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
481 if self.sStatus == 'connecting':
482 self.oTransport.cancelConnect();
483
484 self.sStatus = "cancelled";
485 oThread = self.oThread;
486 self.unlockTask();
487
488 if not fSync:
489 return False;
490
491 oThread.join(61.0);
492 return oThread.isAlive();
493
494 def taskThread(self):
495 """
496 The task thread function.
497 This does some housekeeping activities around the real task method call.
498 """
499 if not self.isCancelled():
500 try:
501 fnTask = self.fnTask;
502 oTaskRc = fnTask(*self.aTaskArgs);
503 except:
504 reporter.fatalXcpt('taskThread', 15);
505 oTaskRc = None;
506 else:
507 reporter.log('taskThread: cancelled already');
508
509 self.lockTask();
510
511 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
512 self.oTaskRc = oTaskRc;
513 self.oThread = None;
514 self.sStatus = '';
515 self.signalTaskLocked();
516
517 self.unlockTask();
518 return None;
519
520 def isCancelled(self):
521 """Internal method for checking if the task has been cancelled."""
522 self.lockTask();
523 sStatus = self.sStatus;
524 self.unlockTask();
525 if sStatus == "cancelled":
526 return True;
527 return False;
528
529 def hasTimedOut(self):
530 """Internal method for checking if the task has timed out or not."""
531 cMsLeft = self.getMsLeft();
532 if cMsLeft <= 0:
533 return True;
534 return False;
535
536 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
537 """Gets the time left until the timeout."""
538 cMsElapsed = base.timestampMilli() - self.msStart;
539 if cMsElapsed < 0:
540 return cMsMin;
541 cMsLeft = self.cMsTimeout - cMsElapsed;
542 if cMsLeft <= cMsMin:
543 return cMsMin;
544 if cMsLeft > cMsMax and cMsMax > 0:
545 return cMsMax
546 return cMsLeft;
547
548 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
549 """
550 Wrapper for TransportBase.recvMsg that stashes the response away
551 so the client can inspect it later on.
552 """
553 if cMsTimeout == None:
554 cMsTimeout = self.getMsLeft(500);
555 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
556 self.lockTask();
557 self.t3oReply = (cbMsg, sOpcode, abPayload);
558 self.unlockTask();
559 return (cbMsg, sOpcode, abPayload);
560
561 def recvAck(self, fNoDataOk = False):
562 """
563 Receives an ACK or error response from the TXS.
564
565 Returns True on success.
566 Returns False on timeout or transport error.
567 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
568 and there are always details of some sort or another.
569 """
570 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
571 if cbMsg is None:
572 return False;
573 sOpcode = sOpcode.strip()
574 if sOpcode == "ACK":
575 return True;
576 return (sOpcode, getSZ(abPayload, 0, sOpcode));
577
578 def recvAckLogged(self, sCommand, fNoDataOk = False):
579 """
580 Wrapper for recvAck and logging.
581 Returns True on success (ACK).
582 Returns False on time, transport error and errors signalled by TXS.
583 """
584 rc = self.recvAck(fNoDataOk);
585 if rc is not True and not fNoDataOk:
586 if rc is False:
587 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
588 else:
589 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
590 rc = False;
591 return rc;
592
593 def recvTrueFalse(self, sCommand):
594 """
595 Receives a TRUE/FALSE response from the TXS.
596 Returns True on TRUE, False on FALSE and None on error/other (logged).
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply();
599 if cbMsg is None:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 return None;
602
603 sOpcode = sOpcode.strip()
604 if sOpcode == "TRUE":
605 return True;
606 if sOpcode == "FALSE":
607 return False;
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
609 return None;
610
611 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
612 """
613 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
614 """
615 if cMsTimeout == None:
616 cMsTimeout = self.getMsLeft(500);
617 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
618
619 def asyncToSync(self, fnAsync, *aArgs):
620 """
621 Wraps an asynchronous task into a synchronous operation.
622
623 Returns False on failure, task return status on success.
624 """
625 rc = fnAsync(*aArgs);
626 if rc is False:
627 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
628 return rc;
629
630 rc = self.waitForTask(self.cMsTimeout + 5000);
631 if rc is False:
632 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
633 self.cancelTask();
634 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
635 return False;
636
637 rc = self.getResult();
638 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
639 return rc;
640
641 #
642 # Connection tasks.
643 #
644
645 def taskConnect(self, cMsIdleFudge):
646 """Tries to connect to the TXS"""
647 while not self.isCancelled():
648 reporter.log2('taskConnect: connecting ...');
649 rc = self.oTransport.connect(self.getMsLeft(500));
650 if rc is True:
651 reporter.log('taskConnect: succeeded');
652 return self.taskGreet(cMsIdleFudge);
653 if rc is None:
654 reporter.log2('taskConnect: unable to connect');
655 return None;
656 if self.hasTimedOut():
657 reporter.log2('taskConnect: timed out');
658 if not self.fTryConnect:
659 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
660 return False;
661 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
662 if not self.fTryConnect:
663 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
664 return False;
665
666 def taskGreet(self, cMsIdleFudge):
667 """Greets the TXS"""
668 rc = self.sendMsg("HOWDY", ());
669 if rc is True:
670 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
671 if rc is True:
672 while cMsIdleFudge > 0:
673 cMsIdleFudge -= 1000;
674 time.sleep(1);
675 else:
676 self.oTransport.disconnect(self.fTryConnect);
677 return rc;
678
679 def taskBye(self):
680 """Says goodbye to the TXS"""
681 rc = self.sendMsg("BYE");
682 if rc is True:
683 rc = self.recvAckLogged("BYE");
684 self.oTransport.disconnect();
685 return rc;
686
687 def taskUuid(self):
688 """Gets the TXS UUID"""
689 rc = self.sendMsg("UUID");
690 if rc is True:
691 rc = False;
692 cbMsg, sOpcode, abPayload = self.recvReply();
693 if cbMsg is not None:
694 sOpcode = sOpcode.strip()
695 if sOpcode == "ACK UUID":
696 sUuid = getSZ(abPayload, 0);
697 if sUuid is not None:
698 sUuid = '{%s}' % (sUuid,)
699 try:
700 _ = uuid.UUID(sUuid);
701 rc = sUuid;
702 except:
703 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
704 else:
705 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
706 else:
707 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
708 else:
709 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
710 return rc;
711
712 #
713 # Process task
714 # pylint: disable=C0111
715 #
716
717 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
718 # Construct the payload.
719 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
720 for sArg in asArgs:
721 aoPayload.append('%s' % (sArg));
722 aoPayload.append(long(len(asAddEnv)));
723 for sPutEnv in asAddEnv:
724 aoPayload.append('%s' % (sPutEnv));
725 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
726 if isinstance(o, basestring):
727 aoPayload.append(o);
728 elif o is not None:
729 aoPayload.append('|');
730 o.uTxsClientCrc32 = zlib.crc32('');
731 else:
732 aoPayload.append('');
733 aoPayload.append('%s' % (sAsUser));
734 aoPayload.append(long(self.cMsTimeout));
735
736 # Kick of the EXEC command.
737 rc = self.sendMsg('EXEC', aoPayload)
738 if rc is True:
739 rc = self.recvAckLogged('EXEC');
740 if rc is True:
741 # Loop till the process completes, feed input to the TXS and
742 # receive output from it.
743 sFailure = "";
744 msPendingInputReply = None;
745 cbMsg, sOpcode, abPayload = (None, None, None);
746 while True:
747 # Pending input?
748 if msPendingInputReply is None \
749 and oStdIn is not None \
750 and not isinstance(oStdIn, basestring):
751 try:
752 abInput = oStdIn.read(65536);
753 except:
754 reporter.errorXcpt('read standard in');
755 sFailure = 'exception reading stdin';
756 rc = None;
757 break;
758 if len(abInput) > 0:
759 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
760 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
761 if rc is not True:
762 sFailure = 'sendMsg failure';
763 break;
764 msPendingInputReply = base.timestampMilli();
765 continue;
766
767 # Wait for input (500 ms timeout).
768 if cbMsg is None:
769 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
770 if cbMsg == None:
771 # Check for time out before restarting the loop.
772 # Note! Only doing timeout checking here does mean that
773 # the TXS may prevent us from timing out by
774 # flooding us with data. This is unlikely though.
775 if self.hasTimedOut() \
776 and ( msPendingInputReply is None \
777 or base.timestampMilli() - msPendingInputReply > 30000):
778 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
779 sFailure = 'timeout';
780 rc = None;
781 break;
782 # Check that the connection is OK.
783 if not self.oTransport.isConnectionOk():
784 self.oTransport.disconnect();
785 sFailure = 'disconnected';
786 rc = False;
787 break;
788 continue;
789
790 # Handle the response.
791 sOpcode = sOpcode.rstrip();
792 if sOpcode == 'STDOUT':
793 oOut = oStdOut;
794 elif sOpcode == 'STDERR':
795 oOut = oStdErr;
796 elif sOpcode == 'TESTPIPE':
797 oOut = oTestPipe;
798 else:
799 oOut = None;
800 if oOut is not None:
801 # Output from the process.
802 if len(abPayload) < 4:
803 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
804 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
805 rc = None;
806 break;
807 uStreamCrc32 = getU32(abPayload, 0);
808 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
809 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
810 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
811 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
812 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
813 rc = None;
814 break;
815 try:
816 oOut.write(abPayload[4:]);
817 except:
818 sFailure = 'exception writing %s' % (sOpcode);
819 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
820 rc = None;
821 break;
822 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
823 # Standard input is ignored. Ignore this condition for now.
824 msPendingInputReply = None;
825 reporter.log('taskExecEx: Standard input is ignored... why?');
826 del oStdIn.uTxsClientCrc32;
827 oStdIn = '/dev/null';
828 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
829 and msPendingInputReply is not None:
830 # TXS STDIN error, abort.
831 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
832 msPendingInputReply = None;
833 sFailure = 'TXS is out of memory for std input buffering';
834 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
835 rc = None;
836 break;
837 elif sOpcode == 'ACK' and msPendingInputReply is not None:
838 msPendingInputReply = None;
839 elif sOpcode.startswith('PROC '):
840 # Process status message, handle it outside the loop.
841 rc = True;
842 break;
843 else:
844 sFailure = 'Unexpected opcode %s' % (sOpcode);
845 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
846 rc = None;
847 break;
848 # Clear the message.
849 cbMsg, sOpcode, abPayload = (None, None, None);
850
851 # If we sent an STDIN packet and didn't get a reply yet, we'll give
852 # TXS some 5 seconds to reply to this. If we don't wait here we'll
853 # get screwed later on if we mix it up with the reply to some other
854 # command. Hackish.
855 if msPendingInputReply is not None:
856 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
857 if cbMsg2 is not None:
858 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
859 % (cbMsg2, sOpcode2, abPayload2));
860 msPendingInputReply = None;
861 else:
862 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
863 self.fScrewedUpMsgState = True;
864
865 # Parse the exit status (True), abort (None) or do nothing (False).
866 if rc is True:
867 if sOpcode == 'PROC OK':
868 rc = True;
869 else:
870 # Do proper parsing some other day if needed:
871 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
872 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
873 rc = False;
874 else:
875 if rc is None:
876 # Abort it.
877 reporter.log('taskExecEx: sending ABORT...');
878 rc = self.sendMsg('ABORT');
879 while rc is True:
880 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
881 if cbMsg is None:
882 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
883 self.fScrewedUpMsgState = True;
884 break;
885 if sOpcode.startswith('PROC '):
886 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
887 break;
888 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
889 # Check that the connection is OK before looping.
890 if not self.oTransport.isConnectionOk():
891 self.oTransport.disconnect();
892 break;
893
894 # Fake response with the reason why we quit.
895 if sFailure is not None:
896 self.t3oReply = (0, 'EXECFAIL', sFailure);
897 rc = None;
898 else:
899 rc = None;
900
901 # Cleanup.
902 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
903 if o is not None and not isinstance(o, basestring):
904 del o.uTxsClientCrc32; # pylint: disable=E1103
905 o.close(); # Make sure the files are closed
906 reporter.log('taskExecEx: returns %s' % (rc));
907 return rc;
908
909 #
910 # Admin tasks
911 #
912
913 def hlpRebootShutdownWaitForAck(self, sCmd):
914 """Wait for reboot/shutodwn ACK."""
915 rc = self.recvAckLogged(sCmd);
916 if rc is True:
917 # poll a little while for server to disconnect.
918 uMsStart = base.timestampMilli();
919 while self.oTransport.isConnectionOk() \
920 and base.timestampMilli() - uMsStart >= 5000:
921 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
922 break;
923 self.oTransport.disconnect();
924 return rc;
925
926 def taskReboot(self):
927 rc = self.sendMsg('REBOOT');
928 if rc is True:
929 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
930 return rc;
931
932 def taskShutdown(self):
933 rc = self.sendMsg('SHUTDOWN');
934 if rc is True:
935 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
936 return rc;
937
938 #
939 # CD/DVD control tasks.
940 #
941
942 ## TODO
943
944 #
945 # File system tasks
946 #
947
948 def taskMkDir(self, sRemoteDir, fMode):
949 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
950 if rc is True:
951 rc = self.recvAckLogged('MKDIR');
952 return rc;
953
954 def taskMkDirPath(self, sRemoteDir, fMode):
955 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
956 if rc is True:
957 rc = self.recvAckLogged('MKDRPATH');
958 return rc;
959
960 def taskMkSymlink(self, sLinkTarget, sLink):
961 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
962 if rc is True:
963 rc = self.recvAckLogged('MKSYMLNK');
964 return rc;
965
966 def taskRmDir(self, sRemoteDir):
967 rc = self.sendMsg('RMDIR', (sRemoteDir,));
968 if rc is True:
969 rc = self.recvAckLogged('RMDIR');
970 return rc;
971
972 def taskRmFile(self, sRemoteFile):
973 rc = self.sendMsg('RMFILE', (sRemoteFile,));
974 if rc is True:
975 rc = self.recvAckLogged('RMFILE');
976 return rc;
977
978 def taskRmSymlink(self, sRemoteSymlink):
979 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
980 if rc is True:
981 rc = self.recvAckLogged('RMSYMLNK');
982 return rc;
983
984 def taskRmTree(self, sRemoteTree):
985 rc = self.sendMsg('RMTREE', (sRemoteTree,));
986 if rc is True:
987 rc = self.recvAckLogged('RMTREE');
988 return rc;
989
990 #def "CHMOD "
991 #def "CHOWN "
992 #def "CHGRP "
993
994 def taskIsDir(self, sRemoteDir):
995 rc = self.sendMsg('ISDIR', (sRemoteDir,));
996 if rc is True:
997 rc = self.recvTrueFalse('ISDIR');
998 return rc;
999
1000 def taskIsFile(self, sRemoteFile):
1001 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1002 if rc is True:
1003 rc = self.recvTrueFalse('ISFILE');
1004 return rc;
1005
1006 def taskIsSymlink(self, sRemoteSymlink):
1007 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1008 if rc is True:
1009 rc = self.recvTrueFalse('ISSYMLNK');
1010 return rc;
1011
1012 #def "STAT "
1013 #def "LSTAT "
1014 #def "LIST "
1015
1016 def taskUploadFile(self, sLocalFile, sRemoteFile):
1017 #
1018 # Open the local file (make sure it exist before bothering TXS) and
1019 # tell TXS that we want to upload a file.
1020 #
1021 try:
1022 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1023 except:
1024 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1025 return False;
1026
1027 # Common cause with taskUploadStr
1028 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1029
1030 # Cleanup.
1031 oLocalFile.close();
1032 return rc;
1033
1034 def taskUploadString(self, sContent, sRemoteFile):
1035 # Wrap sContent in a file like class.
1036 class InStringFile(object): # pylint: disable=R0903
1037 def __init__(self, sContent):
1038 self.sContent = sContent;
1039 self.off = 0;
1040
1041 def read(self, cbMax):
1042 cbLeft = len(self.sContent) - self.off;
1043 if cbLeft == 0:
1044 return "";
1045 if cbLeft <= cbMax:
1046 sRet = self.sContent[self.off:(self.off + cbLeft)];
1047 else:
1048 sRet = self.sContent[self.off:(self.off + cbMax)];
1049 self.off = self.off + len(sRet);
1050 return sRet;
1051
1052 oLocalString = InStringFile(sContent);
1053 return self.taskUploadCommon(oLocalString, sRemoteFile);
1054
1055 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1056 """Common worker used by taskUploadFile and taskUploadString."""
1057 # Command + ACK.
1058 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1059 if rc is True:
1060 rc = self.recvAckLogged('PUT FILE');
1061 if rc is True:
1062 #
1063 # Push data packets until eof.
1064 #
1065 uMyCrc32 = zlib.crc32("");
1066 while True:
1067 # Read up to 64 KB of data.
1068 try:
1069 sRaw = oLocalFile.read(65536);
1070 except:
1071 rc = None;
1072 break;
1073
1074 # Convert to array - this is silly!
1075 abBuf = array.array('B');
1076 for i in range(len(sRaw)):
1077 abBuf.append(ord(sRaw[i]));
1078 sRaw = None;
1079
1080 # Update the file stream CRC and send it off.
1081 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1082 if len(abBuf) == 0:
1083 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1084 else:
1085 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1086 if rc is False:
1087 break;
1088
1089 # Wait for the reply.
1090 rc = self.recvAck();
1091 if rc is not True:
1092 if rc is False:
1093 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1094 else:
1095 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1096 rc = False;
1097 break;
1098
1099 # EOF?
1100 if len(abBuf) == 0:
1101 break;
1102
1103 # Send ABORT on ACK and I/O errors.
1104 if rc is None:
1105 rc = self.sendMsg('ABORT');
1106 if rc is True:
1107 self.recvAckLogged('ABORT');
1108 rc = False;
1109 return rc;
1110
1111 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1112 try:
1113 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1114 except:
1115 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1116 return False;
1117
1118 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1119
1120 oLocalFile.close();
1121 if rc is False:
1122 try:
1123 os.remove(sLocalFile);
1124 except:
1125 reporter.errorXcpt();
1126 return rc;
1127
1128 def taskDownloadString(self, sRemoteFile):
1129 # Wrap sContent in a file like class.
1130 class OutStringFile(object): # pylint: disable=R0903
1131 def __init__(self):
1132 self.asContent = [];
1133
1134 def write(self, sBuf):
1135 self.asContent.append(sBuf);
1136 return None;
1137
1138 oLocalString = OutStringFile();
1139 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1140 if rc is True:
1141 if len(oLocalString.asContent) == 0:
1142 rc = '';
1143 else:
1144 rc = ''.join(oLocalString.asContent);
1145 return rc;
1146
1147 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1148 """Common worker for taskDownloadFile and taskDownloadString."""
1149 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1150 if rc is True:
1151 #
1152 # Process data packets until eof.
1153 #
1154 uMyCrc32 = zlib.crc32("");
1155 while rc is True:
1156 cbMsg, sOpcode, abPayload = self.recvReply();
1157 if cbMsg is None:
1158 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1159 rc = None;
1160 break;
1161
1162 # Validate.
1163 sOpcode = sOpcode.rstrip();
1164 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1165 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1166 % (sOpcode, getSZ(abPayload, 0, "None")));
1167 rc = False;
1168 break;
1169 if sOpcode == 'DATA' and len(abPayload) < 4:
1170 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1171 rc = None;
1172 break;
1173 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1174 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1175 rc = None;
1176 break;
1177
1178 # Check the CRC (common for both packets).
1179 uCrc32 = getU32(abPayload, 0);
1180 if sOpcode == 'DATA':
1181 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1182 if uCrc32 != (uMyCrc32 & 0xffffffff):
1183 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1184 % (hex(uMyCrc32), hex(uCrc32)));
1185 rc = None;
1186 break;
1187 if sOpcode == 'DATA EOF':
1188 rc = self.sendMsg('ACK');
1189 break;
1190
1191 # Finally, push the data to the file.
1192 try:
1193 oLocalFile.write(abPayload[4:].tostring());
1194 except:
1195 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1196 rc = None;
1197 break;
1198 rc = self.sendMsg('ACK');
1199
1200 # Send NACK on validation and I/O errors.
1201 if rc is None:
1202 rc = self.sendMsg('NACK');
1203 rc = False;
1204 return rc;
1205
1206 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1207 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1208 if rc is True:
1209 rc = self.recvAckLogged('UNPKFILE');
1210 return rc;
1211
1212 # pylint: enable=C0111
1213
1214
1215 #
1216 # Public methods - generic task queries
1217 #
1218
1219 def isSuccess(self):
1220 """Returns True if the task completed successfully, otherwise False."""
1221 self.lockTask();
1222 sStatus = self.sStatus;
1223 oTaskRc = self.oTaskRc;
1224 self.unlockTask();
1225 if sStatus != "":
1226 return False;
1227 if oTaskRc is False or oTaskRc is None:
1228 return False;
1229 return True;
1230
1231 def getResult(self):
1232 """
1233 Returns the result of a completed task.
1234 Returns None if not completed yet or no previous task.
1235 """
1236 self.lockTask();
1237 sStatus = self.sStatus;
1238 oTaskRc = self.oTaskRc;
1239 self.unlockTask();
1240 if sStatus != "":
1241 return None;
1242 return oTaskRc;
1243
1244 def getLastReply(self):
1245 """
1246 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1247 Returns a None, None, None three-tuple if there was no last reply.
1248 """
1249 self.lockTask();
1250 t3oReply = self.t3oReply;
1251 self.unlockTask();
1252 return t3oReply;
1253
1254 #
1255 # Public methods - connection.
1256 #
1257
1258 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1259 """
1260 Initiates a disconnect task.
1261
1262 Returns True on success, False on failure (logged).
1263
1264 The task returns True on success and False on failure.
1265 """
1266 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1267
1268 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1269 """Synchronous version."""
1270 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1271
1272 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1273 """
1274 Initiates a task for getting the TXS UUID.
1275
1276 Returns True on success, False on failure (logged).
1277
1278 The task returns UUID string (in {}) on success and False on failure.
1279 """
1280 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1281
1282 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1283 """Synchronous version."""
1284 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1285
1286 #
1287 # Public methods - execution.
1288 #
1289
1290 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1291 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1292 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1293 """
1294 Initiates a exec process task.
1295
1296 Returns True on success, False on failure (logged).
1297
1298 The task returns True if the process exited normally with status code 0.
1299 The task returns None if on failure prior to executing the process, and
1300 False if the process exited with a different status or in an abnormal
1301 manner. Both None and False are logged of course and further info can
1302 also be obtained by getLastReply().
1303
1304 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1305 these streams. If None, no special action is taken and the output goes
1306 to where ever the TXS sends its output, and ditto for input.
1307 - To send to / read from the bitbucket, pass '/dev/null'.
1308 - To redirect to/from a file, just specify the remote filename.
1309 - To append to a file use '>>' followed by the remote filename.
1310 - To pipe the stream to/from the TXS, specify a file like
1311 object. For StdIn a non-blocking read() method is required. For
1312 the other a write() method is required. Watch out for deadlock
1313 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1314 """
1315 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1316 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1317 oStdOut, oStdErr, oTestPipe, sAsUser));
1318
1319 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1320 oStdIn = '/dev/null', oStdOut = '/dev/null',
1321 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1322 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1323 """Synchronous version."""
1324 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1325 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1326
1327 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1328 cMsTimeout = 3600000, fIgnoreErrors = False):
1329 """
1330 Initiates a exec process test task.
1331
1332 Returns True on success, False on failure (logged).
1333
1334 The task returns True if the process exited normally with status code 0.
1335 The task returns None if on failure prior to executing the process, and
1336 False if the process exited with a different status or in an abnormal
1337 manner. Both None and False are logged of course and further info can
1338 also be obtained by getLastReply().
1339
1340 Standard in is taken from /dev/null. While both standard output and
1341 standard error goes directly to reporter.log(). The testpipe is piped
1342 to reporter.xxxx.
1343 """
1344
1345 sStdIn = '/dev/null';
1346 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1347 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1348 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1349 else: oTestPipe = '/dev/null';
1350
1351 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1352 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1353
1354 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1355 cMsTimeout = 3600000, fIgnoreErrors = False):
1356 """Synchronous version."""
1357 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1358 cMsTimeout, fIgnoreErrors);
1359
1360 #
1361 # Public methods - file system
1362 #
1363
1364 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1365 """
1366 Initiates a reboot task.
1367
1368 Returns True on success, False on failure (logged).
1369
1370 The task returns True on success, False on failure (logged). The
1371 session will be disconnected on successful task completion.
1372 """
1373 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1374
1375 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1376 """Synchronous version."""
1377 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1378
1379 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1380 """
1381 Initiates a shutdown task.
1382
1383 Returns True on success, False on failure (logged).
1384
1385 The task returns True on success, False on failure (logged).
1386 """
1387 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1388
1389 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1390 """Synchronous version."""
1391 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1392
1393
1394 #
1395 # Public methods - file system
1396 #
1397
1398 def asyncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1399 """
1400 Initiates a mkdir task.
1401
1402 Returns True on success, False on failure (logged).
1403
1404 The task returns True on success, False on failure (logged).
1405 """
1406 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1407
1408 def syncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1409 """Synchronous version."""
1410 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1411
1412 def asyncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1413 """
1414 Initiates a mkdir -p task.
1415
1416 Returns True on success, False on failure (logged).
1417
1418 The task returns True on success, False on failure (logged).
1419 """
1420 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1421
1422 def syncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1423 """Synchronous version."""
1424 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1425
1426 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1427 """
1428 Initiates a symlink task.
1429
1430 Returns True on success, False on failure (logged).
1431
1432 The task returns True on success, False on failure (logged).
1433 """
1434 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1435
1436 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1437 """Synchronous version."""
1438 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1439
1440 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1441 """
1442 Initiates a rmdir task.
1443
1444 Returns True on success, False on failure (logged).
1445
1446 The task returns True on success, False on failure (logged).
1447 """
1448 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1449
1450 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1451 """Synchronous version."""
1452 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1453
1454 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1455 """
1456 Initiates a rmfile task.
1457
1458 Returns True on success, False on failure (logged).
1459
1460 The task returns True on success, False on failure (logged).
1461 """
1462 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1463
1464 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1465 """Synchronous version."""
1466 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1467
1468 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1469 """
1470 Initiates a rmsymlink task.
1471
1472 Returns True on success, False on failure (logged).
1473
1474 The task returns True on success, False on failure (logged).
1475 """
1476 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1477
1478 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1479 """Synchronous version."""
1480 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1481
1482 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1483 """
1484 Initiates a rmtree task.
1485
1486 Returns True on success, False on failure (logged).
1487
1488 The task returns True on success, False on failure (logged).
1489 """
1490 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1491
1492 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1493 """Synchronous version."""
1494 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1495
1496 #def "CHMOD "
1497 #def "CHOWN "
1498 #def "CHGRP "
1499
1500 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1501 """
1502 Initiates a is-dir query task.
1503
1504 Returns True on success, False on failure (logged).
1505
1506 The task returns True if it's a directory, False if it isn't, and
1507 None on error (logged).
1508 """
1509 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1510
1511 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1512 """Synchronous version."""
1513 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1514
1515 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1516 """
1517 Initiates a is-file query task.
1518
1519 Returns True on success, False on failure (logged).
1520
1521 The task returns True if it's a file, False if it isn't, and None on
1522 error (logged).
1523 """
1524 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1525
1526 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1527 """Synchronous version."""
1528 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1529
1530 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1531 """
1532 Initiates a is-symbolic-link query task.
1533
1534 Returns True on success, False on failure (logged).
1535
1536 The task returns True if it's a symbolic linke, False if it isn't, and
1537 None on error (logged).
1538 """
1539 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1540
1541 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1542 """Synchronous version."""
1543 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1544
1545 #def "STAT "
1546 #def "LSTAT "
1547 #def "LIST "
1548
1549 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1550 """
1551 Initiates a download query task.
1552
1553 Returns True on success, False on failure (logged).
1554
1555 The task returns True on success, False on failure (logged).
1556 """
1557 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1558
1559 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """Synchronous version."""
1561 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1562
1563 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1564 """
1565 Initiates a upload string task.
1566
1567 Returns True on success, False on failure (logged).
1568
1569 The task returns True on success, False on failure (logged).
1570 """
1571 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1572
1573 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """Synchronous version."""
1575 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1576
1577 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1578 """
1579 Initiates a download file task.
1580
1581 Returns True on success, False on failure (logged).
1582
1583 The task returns True on success, False on failure (logged).
1584 """
1585 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1586
1587 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """Synchronous version."""
1589 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1590
1591 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1592 """
1593 Initiates a download string task.
1594
1595 Returns True on success, False on failure (logged).
1596
1597 The task returns a byte string on success, False on failure (logged).
1598 """
1599 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1600
1601 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """Synchronous version."""
1603 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1604
1605 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1606 """
1607 Initiates a unpack file task.
1608
1609 Returns True on success, False on failure (logged).
1610
1611 The task returns True on success, False on failure (logged).
1612 """
1613 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1614 (sRemoteFile, sRemoteDir));
1615
1616 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1617 """Synchronous version."""
1618 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1619
1620
1621class TransportTcp(TransportBase):
1622 """
1623 TCP transport layer for the TXS client session class.
1624 """
1625
1626 def __init__(self, sHostname, uPort, fReversedSetup):
1627 """
1628 Save the parameters. The session will call us back to make the
1629 connection later on its worker thread.
1630 """
1631 TransportBase.__init__(self, utils.getCallerName());
1632 self.sHostname = sHostname;
1633 self.fReversedSetup = fReversedSetup;
1634 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1635 self.oSocket = None;
1636 self.oWakeupW = None;
1637 self.oWakeupR = None;
1638 self.fConnectCanceled = False;
1639 self.fIsConnecting = False;
1640 self.oCv = threading.Condition();
1641 self.abReadAhead = array.array('B');
1642
1643 def toString(self):
1644 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1645 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1646 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1647 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1648
1649 def __isInProgressXcpt(self, oXcpt):
1650 """ In progress exception? """
1651 try:
1652 if isinstance(oXcpt, socket.error):
1653 try:
1654 if oXcpt[0] == errno.EINPROGRESS:
1655 return True;
1656 except: pass;
1657 # Windows?
1658 try:
1659 if oXcpt[0] == errno.EWOULDBLOCK:
1660 return True;
1661 except: pass;
1662 except:
1663 pass;
1664 return False;
1665
1666 def __isWouldBlockXcpt(self, oXcpt):
1667 """ Would block exception? """
1668 try:
1669 if isinstance(oXcpt, socket.error):
1670 try:
1671 if oXcpt[0] == errno.EWOULDBLOCK:
1672 return True;
1673 except: pass;
1674 try:
1675 if oXcpt[0] == errno.EAGAIN:
1676 return True;
1677 except: pass;
1678 except:
1679 pass;
1680 return False;
1681
1682 def __isConnectionReset(self, oXcpt):
1683 """ Connection reset by Peer or others. """
1684 try:
1685 if isinstance(oXcpt, socket.error):
1686 try:
1687 if oXcpt[0] == errno.ECONNRESET:
1688 return True;
1689 except: pass;
1690 try:
1691 if oXcpt[0] == errno.ENETRESET:
1692 return True;
1693 except: pass;
1694 except:
1695 pass;
1696 return False;
1697
1698 def _closeWakeupSockets(self):
1699 """ Closes the wakup sockets. Caller should own the CV. """
1700 oWakeupR = self.oWakeupR;
1701 self.oWakeupR = None;
1702 if oWakeupR is not None:
1703 oWakeupR.close();
1704
1705 oWakeupW = self.oWakeupW;
1706 self.oWakeupW = None;
1707 if oWakeupW is not None:
1708 oWakeupW.close();
1709
1710 return None;
1711
1712 def cancelConnect(self):
1713 # This is bad stuff.
1714 self.oCv.acquire();
1715 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1716 self.fConnectCanceled = True;
1717 if self.fIsConnecting:
1718 oSocket = self.oSocket;
1719 self.oSocket = None;
1720 if oSocket is not None:
1721 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1722 oSocket.close();
1723
1724 oWakeupW = self.oWakeupW;
1725 self.oWakeupW = None;
1726 if oWakeupW is not None:
1727 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1728 try: oWakeupW.send('cancelled!\n');
1729 except: reporter.logXcpt();
1730 try: oWakeupW.shutdown(socket.SHUT_WR);
1731 except: reporter.logXcpt();
1732 oWakeupW.close();
1733 self.oCv.release();
1734
1735 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1736 """ Connects to the TXS server as server, i.e. the reversed setup. """
1737 assert(self.fReversedSetup);
1738
1739 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1740
1741 # Workaround for bind() failure...
1742 try:
1743 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1744 except:
1745 reporter.errorXcpt('socket.listen(1) failed');
1746 return None;
1747
1748 # Bind the socket and make it listen.
1749 try:
1750 oSocket.bind((self.sHostname, self.uPort));
1751 except:
1752 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1753 return None;
1754 try:
1755 oSocket.listen(1);
1756 except:
1757 reporter.errorXcpt('socket.listen(1) failed');
1758 return None;
1759
1760 # Accept connections.
1761 oClientSocket = None;
1762 tClientAddr = None;
1763 try:
1764 (oClientSocket, tClientAddr) = oSocket.accept();
1765 except socket.error, e:
1766 if not self.__isInProgressXcpt(e):
1767 raise;
1768
1769 # Do the actual waiting.
1770 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1771 try:
1772 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1773 except socket.error, e:
1774 if e[0] != errno.EBADF or not self.fConnectCanceled:
1775 raise;
1776 reporter.log('socket.select() on accept was canceled');
1777 return None;
1778 except:
1779 reporter.logXcpt('socket.select() on accept');
1780
1781 # Try accept again.
1782 try:
1783 (oClientSocket, tClientAddr) = oSocket.accept();
1784 except socket.error, e:
1785 if not self.__isInProgressXcpt(e):
1786 if e[0] != errno.EBADF or not self.fConnectCanceled:
1787 raise;
1788 reporter.log('socket.accept() was canceled');
1789 return None;
1790 reporter.log('socket.accept() timed out');
1791 return False;
1792 except:
1793 reporter.errorXcpt('socket.accept() failed');
1794 return None;
1795 except:
1796 reporter.errorXcpt('socket.accept() failed');
1797 return None;
1798
1799 # Store the connected socket and throw away the server socket.
1800 self.oCv.acquire();
1801 if not self.fConnectCanceled:
1802 self.oSocket.close();
1803 self.oSocket = oClientSocket;
1804 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1805 self.oCv.release();
1806 return True;
1807
1808 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1809 """ Connects to the TXS server as client. """
1810 assert(not self.fReversedSetup);
1811
1812 # Connect w/ timeouts.
1813 rc = None;
1814 try:
1815 oSocket.connect((self.sHostname, self.uPort));
1816 rc = True;
1817 except socket.error, e:
1818 iRc = e[0];
1819 if self.__isInProgressXcpt(e):
1820 # Do the actual waiting.
1821 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1822 try:
1823 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1824 if len(ttRc[1]) + len(ttRc[2]) == 0:
1825 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1826 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1827 rc = iRc == 0;
1828 except socket.error, e:
1829 iRc = e[0];
1830 except:
1831 iRc = -42;
1832 reporter.fatalXcpt('socket.select() on connect failed');
1833
1834 if rc is True:
1835 pass;
1836 elif iRc == errno.ECONNREFUSED \
1837 or iRc == errno.EHOSTUNREACH \
1838 or iRc == errno.EINTR \
1839 or iRc == errno.ENETDOWN \
1840 or iRc == errno.ENETUNREACH \
1841 or iRc == errno.ETIMEDOUT:
1842 rc = False; # try again.
1843 else:
1844 if iRc != errno.EBADF or not self.fConnectCanceled:
1845 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1846 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1847 except:
1848 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1849 return rc;
1850
1851
1852 def connect(self, cMsTimeout):
1853 # Create a non-blocking socket.
1854 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1855 try:
1856 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1857 except:
1858 reporter.fatalXcpt('socket.socket() failed');
1859 return None;
1860 try:
1861 oSocket.setblocking(0);
1862 except:
1863 oSocket.close();
1864 reporter.fatalXcpt('socket.socket() failed');
1865 return None;
1866
1867 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1868 oWakeupR = None;
1869 oWakeupW = None;
1870 if hasattr(socket, 'socketpair'):
1871 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1872 except: reporter.logXcpt('socket.socketpair() failed');
1873
1874 # Update the state.
1875 self.oCv.acquire();
1876 rc = None;
1877 if not self.fConnectCanceled:
1878 self.oSocket = oSocket;
1879 self.oWakeupW = oWakeupW;
1880 self.oWakeupR = oWakeupR;
1881 self.fIsConnecting = True;
1882 self.oCv.release();
1883
1884 # Try connect.
1885 if oWakeupR is None:
1886 oWakeupR = oSocket; # Avoid select failure.
1887 if self.fReversedSetup:
1888 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1889 else:
1890 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1891 oSocket = None;
1892
1893 # Update the state and cleanup on failure/cancel.
1894 self.oCv.acquire();
1895 if rc is True and self.fConnectCanceled:
1896 rc = False;
1897 self.fIsConnecting = False;
1898
1899 if rc is not True:
1900 if self.oSocket is not None:
1901 self.oSocket.close();
1902 self.oSocket = None;
1903 self._closeWakeupSockets();
1904 self.oCv.release();
1905
1906 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1907 return rc;
1908
1909 def disconnect(self, fQuiet = False):
1910 if self.oSocket is not None:
1911 self.abReadAhead = array.array('B');
1912
1913 # Try a shutting down the socket gracefully (draining it).
1914 try:
1915 self.oSocket.shutdown(socket.SHUT_WR);
1916 except:
1917 if not fQuiet:
1918 reporter.error('shutdown(SHUT_WR)');
1919 try:
1920 self.oSocket.setblocking(0); # just in case it's not set.
1921 sData = "1";
1922 while len(sData) > 0:
1923 sData = self.oSocket.recv(16384);
1924 except:
1925 pass;
1926
1927 # Close it.
1928 self.oCv.acquire();
1929 try: self.oSocket.setblocking(1);
1930 except: pass;
1931 self.oSocket.close();
1932 self.oSocket = None;
1933 else:
1934 self.oCv.acquire();
1935 self._closeWakeupSockets();
1936 self.oCv.release();
1937
1938 def sendBytes(self, abMsg, cMsTimeout):
1939 if self.oSocket is None:
1940 reporter.error('TransportTcp.sendBytes: No connection.');
1941 return False;
1942
1943 # Try send it all.
1944 try:
1945 cbSent = self.oSocket.send(abMsg);
1946 if cbSent == len(abMsg):
1947 return True;
1948 except Exception, oXcpt:
1949 if not self.__isWouldBlockXcpt(oXcpt):
1950 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1951 return False;
1952 cbSent = 0;
1953
1954 # Do a timed send.
1955 msStart = base.timestampMilli();
1956 while True:
1957 cMsElapsed = base.timestampMilli() - msStart;
1958 if cMsElapsed > cMsTimeout:
1959 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abMsg)));
1960 break;
1961
1962 # wait.
1963 try:
1964 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1965 if len(ttRc[2]) > 0 and len(ttRc[1]) == 0:
1966 reporter.error('TranportTcp.sendBytes: select returned with exception');
1967 break;
1968 if len(ttRc[1]) == 0:
1969 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abMsg)));
1970 break;
1971 except:
1972 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1973 break;
1974
1975 # Try send more.
1976 try:
1977 cbSent += self.oSocket.send(abMsg[cbSent:]);
1978 if cbSent == len(abMsg):
1979 return True;
1980 except Exception, oXcpt:
1981 if not self.__isWouldBlockXcpt(oXcpt):
1982 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1983 break;
1984
1985 return False;
1986
1987 def __returnReadAheadBytes(self, cb):
1988 """ Internal worker for recvBytes. """
1989 assert(len(self.abReadAhead) >= cb);
1990 abRet = self.abReadAhead[:cb];
1991 self.abReadAhead = self.abReadAhead[cb:];
1992 return abRet;
1993
1994 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
1995 if self.oSocket is None:
1996 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
1997 return None;
1998
1999 # Try read in some more data without bothering with timeout handling first.
2000 if len(self.abReadAhead) < cb:
2001 try:
2002 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2003 if len(abBuf) > 0:
2004 self.abReadAhead.extend(array.array('B', abBuf));
2005 except Exception, oXcpt:
2006 if not self.__isWouldBlockXcpt(oXcpt):
2007 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2008 return None;
2009
2010 if len(self.abReadAhead) >= cb:
2011 return self.__returnReadAheadBytes(cb);
2012
2013 # Timeout loop.
2014 msStart = base.timestampMilli();
2015 while True:
2016 cMsElapsed = base.timestampMilli() - msStart;
2017 if cMsElapsed > cMsTimeout:
2018 if not fNoDataOk or len(self.abReadAhead) > 0:
2019 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2020 break;
2021
2022 # Wait.
2023 try:
2024 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2025 if len(ttRc[2]) > 0 and len(ttRc[0]) == 0:
2026 reporter.error('TranportTcp.recvBytes: select returned with exception');
2027 break;
2028 if len(ttRc[0]) == 0:
2029 if not fNoDataOk or len(self.abReadAhead) > 0:
2030 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2031 % (len(self.abReadAhead), cb, fNoDataOk));
2032 break;
2033 except:
2034 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2035 break;
2036
2037 # Try read more.
2038 try:
2039 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2040 if len(abBuf) == 0:
2041 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2042 % (len(self.abReadAhead), cb, fNoDataOk));
2043 self.disconnect();
2044 return None;
2045
2046 self.abReadAhead.extend(array.array('B', abBuf));
2047
2048 except Exception, oXcpt:
2049 reporter.log('recv => exception %s' % (oXcpt,));
2050 if not self.__isWouldBlockXcpt(oXcpt):
2051 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or len(self.abReadAhead) > 0:
2052 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2053 break;
2054
2055 # Done?
2056 if len(self.abReadAhead) >= cb:
2057 return self.__returnReadAheadBytes(cb);
2058
2059 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2060 return None;
2061
2062 def isConnectionOk(self):
2063 if self.oSocket is None:
2064 return False;
2065 try:
2066 ttRc = select.select([], [], [self.oSocket], 0.0);
2067 if len(ttRc[2]) > 0:
2068 return False;
2069
2070 self.oSocket.send(array.array('B')); # send zero bytes.
2071 except:
2072 return False;
2073 return True;
2074
2075 def isRecvPending(self, cMsTimeout = 0):
2076 try:
2077 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2078 if len(ttRc[0]) == 0:
2079 return False;
2080 except:
2081 pass;
2082 return True;
2083
2084
2085def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2086 """
2087 Opens a connection to a Test Execution Service via TCP, given its name.
2088 """
2089 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2090 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2091 try:
2092 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2093 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2094 except:
2095 reporter.errorXcpt(None, 15);
2096 return None;
2097 return oSession;
2098
2099
2100def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2101 """
2102 Tries to open a connection to a Test Execution Service via TCP, given its name.
2103
2104 This differs from openTcpSession in that it won't log a connection failure
2105 as an error.
2106 """
2107 try:
2108 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2109 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2110 except:
2111 reporter.errorXcpt(None, 15);
2112 return None;
2113 return oSession;
2114
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette