VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 60853

Last change on this file since 60853 was 60853, checked in by vboxsync, 9 years ago

pylint fix

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 78.8 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 60853 2016-05-05 17:58:55Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2015 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 60853 $"
30
31# Standard Python imports.
32import array
33import errno
34import os
35import select
36import socket
37import threading
38import time
39import types
40import zlib
41import uuid
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49#
50# Helpers for decoding data received from the TXS.
51# These are used both the Session and Transport classes.
52#
53
54def getU32(abData, off):
55 """Get a U32 field."""
56 return abData[off] \
57 + abData[off + 1] * 256 \
58 + abData[off + 2] * 65536 \
59 + abData[off + 3] * 16777216;
60
61def getSZ(abData, off, sDefault = None):
62 """
63 Get a zero-terminated string field.
64 Returns sDefault if the string is invalid.
65 """
66 cchStr = getSZLen(abData, off);
67 if cchStr >= 0:
68 abStr = abData[off:(off + cchStr)];
69 try:
70 return abStr.tostring().decode('utf_8');
71 except:
72 reporter.errorXcpt('getSZ(,%u)' % (off));
73 return sDefault;
74
75def getSZLen(abData, off):
76 """
77 Get the length of a zero-terminated string field, in bytes.
78 Returns -1 if off is beyond the data packet or not properly terminated.
79 """
80 cbData = len(abData);
81 if off >= cbData:
82 return -1;
83
84 offCur = off;
85 while abData[offCur] != 0:
86 offCur = offCur + 1;
87 if offCur >= cbData:
88 return -1;
89
90 return offCur - off;
91
92def isValidOpcodeEncoding(sOpcode):
93 """
94 Checks if the specified opcode is valid or not.
95 Returns True on success.
96 Returns False if it is invalid, details in the log.
97 """
98 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
100 if len(sOpcode) != 8:
101 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
102 return False;
103 for i in range(0, 1):
104 if sSet1.find(sOpcode[i]) < 0:
105 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
106 return False;
107 for i in range(2, 7):
108 if sSet2.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 return True;
112
113#
114# Helper for encoding data sent to the TXS.
115#
116
117def u32ToByteArray(u32):
118 """Encodes the u32 value as a little endian byte (B) array."""
119 return array.array('B', \
120 ( u32 % 256, \
121 (u32 / 256) % 256, \
122 (u32 / 65536) % 256, \
123 (u32 / 16777216) % 256) );
124
125
126
127class TransportBase(object):
128 """
129 Base class for the transport layer.
130 """
131
132 def __init__(self, sCaller):
133 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
134 self.fDummy = 0;
135 self.abReadAheadHdr = array.array('B');
136
137 def toString(self):
138 """
139 Stringify the instance for logging and debugging.
140 """
141 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
142
143 def __str__(self):
144 return self.toString();
145
146 def cancelConnect(self):
147 """
148 Cancels any pending connect() call.
149 Returns None;
150 """
151 return None;
152
153 def connect(self, cMsTimeout):
154 """
155 Quietly attempts to connect to the TXS.
156
157 Returns True on success.
158 Returns False on retryable errors (no logging).
159 Returns None on fatal errors with details in the log.
160
161 Override this method, don't call super.
162 """
163 _ = cMsTimeout;
164 return False;
165
166 def disconnect(self, fQuiet = False):
167 """
168 Disconnect from the TXS.
169
170 Returns True.
171
172 Override this method, don't call super.
173 """
174 _ = fQuiet;
175 return True;
176
177 def sendBytes(self, abBuf, cMsTimeout):
178 """
179 Sends the bytes in the buffer abBuf to the TXS.
180
181 Returns True on success.
182 Returns False on failure and error details in the log.
183
184 Override this method, don't call super.
185
186 Remarks: len(abBuf) is always a multiple of 16.
187 """
188 _ = abBuf; _ = cMsTimeout;
189 return False;
190
191 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
192 """
193 Receive cb number of bytes from the TXS.
194
195 Returns the bytes (array('B')) on success.
196 Returns None on failure and error details in the log.
197
198 Override this method, don't call super.
199
200 Remarks: cb is always a multiple of 16.
201 """
202 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
203 return False;
204
205 def isConnectionOk(self):
206 """
207 Checks if the connection is OK.
208
209 Returns True if it is.
210 Returns False if it isn't (caller should call diconnect).
211
212 Override this method, don't call super.
213 """
214 return True;
215
216 def isRecvPending(self, cMsTimeout = 0):
217 """
218 Checks if there is incoming bytes, optionally waiting cMsTimeout
219 milliseconds for something to arrive.
220
221 Returns True if there is, False if there isn't.
222
223 Override this method, don't call super.
224 """
225 _ = cMsTimeout;
226 return False;
227
228 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
229 """
230 Sends a message (opcode + encoded payload).
231
232 Returns True on success.
233 Returns False on failure and error details in the log.
234 """
235 # Fix + check the opcode.
236 if len(sOpcode) < 2:
237 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
238 return False;
239 sOpcode = sOpcode.ljust(8);
240 if not isValidOpcodeEncoding(sOpcode):
241 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
242 return False;
243
244 # Start construct the message.
245 cbMsg = 16 + len(abPayload);
246 abMsg = array.array('B');
247 abMsg.extend(u32ToByteArray(cbMsg));
248 abMsg.extend((0, 0, 0, 0)); # uCrc32
249 try:
250 abMsg.extend(array.array('B', \
251 ( ord(sOpcode[0]), \
252 ord(sOpcode[1]), \
253 ord(sOpcode[2]), \
254 ord(sOpcode[3]), \
255 ord(sOpcode[4]), \
256 ord(sOpcode[5]), \
257 ord(sOpcode[6]), \
258 ord(sOpcode[7]) ) ) );
259 if len(abPayload) > 0:
260 abMsg.extend(abPayload);
261 except:
262 reporter.fatalXcpt('sendMsgInt: packing problem...');
263 return False;
264
265 # checksum it, padd it and send it off.
266 uCrc32 = zlib.crc32(abMsg[8:]);
267 abMsg[4:8] = u32ToByteArray(uCrc32);
268
269 while len(abMsg) % 16:
270 abMsg.append(0);
271
272 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
273 return self.sendBytes(abMsg, cMsTimeout);
274
275 def recvMsg(self, cMsTimeout, fNoDataOk = False):
276 """
277 Receives a message from the TXS.
278
279 Returns the message three-tuple: length, opcode, payload.
280 Returns (None, None, None) on failure and error details in the log.
281 """
282
283 # Read the header.
284 if len(self.abReadAheadHdr) > 0:
285 assert(len(self.abReadAheadHdr) == 16);
286 abHdr = self.abReadAheadHdr;
287 self.abReadAheadHdr = array.array('B');
288 else:
289 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
290 if abHdr is None:
291 return (None, None, None);
292 if len(abHdr) != 16:
293 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
294 return (None, None, None);
295
296 # Unpack and validate the header.
297 cbMsg = getU32(abHdr, 0);
298 uCrc32 = getU32(abHdr, 4);
299 sOpcode = abHdr[8:16].tostring().decode('ascii');
300
301 if cbMsg < 16:
302 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
303 return (None, None, None);
304 if cbMsg > 1024*1024:
305 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
306 return (None, None, None);
307 if not isValidOpcodeEncoding(sOpcode):
308 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
309 return (None, None, None);
310
311 # Get the payload (if any), dropping the padding.
312 abPayload = array.array('B');
313 if cbMsg > 16:
314 if cbMsg % 16:
315 cbPadding = 16 - (cbMsg % 16);
316 else:
317 cbPadding = 0;
318 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
319 if abPayload is None:
320 self.abReadAheadHdr = abHdr;
321 if not fNoDataOk :
322 reporter.log('recvMsg: failed to recv payload bytes!');
323 return (None, None, None);
324
325 while cbPadding > 0:
326 abPayload.pop();
327 cbPadding = cbPadding - 1;
328
329 # Check the CRC-32.
330 if uCrc32 != 0:
331 uActualCrc32 = zlib.crc32(abHdr[8:]);
332 if cbMsg > 16:
333 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
334 uActualCrc32 = uActualCrc32 & 0xffffffff;
335 if uCrc32 != uActualCrc32:
336 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
337 return (None, None, None);
338
339 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
340 return (cbMsg, sOpcode, abPayload);
341
342 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
343 """
344 Sends a message (opcode + payload tuple).
345
346 Returns True on success.
347 Returns False on failure and error details in the log.
348 Returns None if you pass the incorrectly typed parameters.
349 """
350 # Encode the payload.
351 abPayload = array.array('B');
352 for o in aoPayload:
353 try:
354 if isinstance(o, basestring):
355 # the primitive approach...
356 sUtf8 = o.encode('utf_8');
357 for i in range(0, len(sUtf8)):
358 abPayload.append(ord(sUtf8[i]))
359 abPayload.append(0);
360 elif isinstance(o, types.LongType):
361 if o < 0 or o > 0xffffffff:
362 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
363 return None;
364 abPayload.extend(u32ToByteArray(o));
365 elif isinstance(o, array.array):
366 abPayload.extend(o);
367 else:
368 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
369 return None;
370 except:
371 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
372 return None;
373 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
374
375
376class Session(TdTaskBase):
377 """
378 A Test eXecution Service (TXS) client session.
379 """
380
381 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
382 """
383 Construct a TXS session.
384
385 This starts by connecting to the TXS and will enter the signalled state
386 when connected or the timeout has been reached.
387 """
388 TdTaskBase.__init__(self, utils.getCallerName());
389 self.oTransport = oTransport;
390 self.sStatus = "";
391 self.cMsTimeout = 0;
392 self.fErr = True; # Whether to report errors as error.
393 self.msStart = 0;
394 self.oThread = None;
395 self.fnTask = self.taskDummy;
396 self.aTaskArgs = None;
397 self.oTaskRc = None;
398 self.t3oReply = (None, None, None);
399 self.fScrewedUpMsgState = False;
400 self.fTryConnect = fTryConnect;
401
402 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
403 raise base.GenError("startTask failed");
404
405 def __del__(self):
406 """Make sure to cancel the task when deleted."""
407 self.cancelTask();
408
409 def toString(self):
410 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
411 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
412 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
413 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
414
415 def taskDummy(self):
416 """Place holder to catch broken state handling."""
417 raise Exception();
418
419 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
420 """
421 Kicks of a new task.
422
423 cMsTimeout: The task timeout in milliseconds. Values less than
424 500 ms will be adjusted to 500 ms. This means it is
425 OK to use negative value.
426 sStatus: The task status.
427 fnTask: The method that'll execute the task.
428 aArgs: Arguments to pass to fnTask.
429
430 Returns True on success, False + error in log on failure.
431 """
432 if not self.cancelTask():
433 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
434 return False;
435
436 # Change status and make sure we're the
437 self.lockTask();
438 if self.sStatus != "":
439 self.unlockTask();
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
441 return False;
442 self.sStatus = "setup";
443 self.oTaskRc = None;
444 self.t3oReply = (None, None, None);
445 self.resetTaskLocked();
446 self.unlockTask();
447
448 self.cMsTimeout = max(cMsTimeout, 500);
449 self.fErr = not fIgnoreErrors;
450 self.fnTask = fnTask;
451 self.aTaskArgs = aArgs;
452 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
453 self.oThread.setDaemon(True);
454 self.msStart = base.timestampMilli();
455
456 self.lockTask();
457 self.sStatus = sStatus;
458 self.unlockTask();
459 self.oThread.start();
460
461 return True;
462
463 def cancelTask(self, fSync = True):
464 """
465 Attempts to cancel any pending tasks.
466 Returns success indicator (True/False).
467 """
468 self.lockTask();
469
470 if self.sStatus == "":
471 self.unlockTask();
472 return True;
473 if self.sStatus == "setup":
474 self.unlockTask();
475 return False;
476 if self.sStatus == "cancelled":
477 self.unlockTask();
478 return False;
479
480 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
481 if self.sStatus == 'connecting':
482 self.oTransport.cancelConnect();
483
484 self.sStatus = "cancelled";
485 oThread = self.oThread;
486 self.unlockTask();
487
488 if not fSync:
489 return False;
490
491 oThread.join(61.0);
492 return oThread.isAlive();
493
494 def taskThread(self):
495 """
496 The task thread function.
497 This does some housekeeping activities around the real task method call.
498 """
499 if not self.isCancelled():
500 try:
501 fnTask = self.fnTask;
502 oTaskRc = fnTask(*self.aTaskArgs);
503 except:
504 reporter.fatalXcpt('taskThread', 15);
505 oTaskRc = None;
506 else:
507 reporter.log('taskThread: cancelled already');
508
509 self.lockTask();
510
511 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
512 self.oTaskRc = oTaskRc;
513 self.oThread = None;
514 self.sStatus = '';
515 self.signalTaskLocked();
516
517 self.unlockTask();
518 return None;
519
520 def isCancelled(self):
521 """Internal method for checking if the task has been cancelled."""
522 self.lockTask();
523 sStatus = self.sStatus;
524 self.unlockTask();
525 if sStatus == "cancelled":
526 return True;
527 return False;
528
529 def hasTimedOut(self):
530 """Internal method for checking if the task has timed out or not."""
531 cMsLeft = self.getMsLeft();
532 if cMsLeft <= 0:
533 return True;
534 return False;
535
536 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
537 """Gets the time left until the timeout."""
538 cMsElapsed = base.timestampMilli() - self.msStart;
539 if cMsElapsed < 0:
540 return cMsMin;
541 cMsLeft = self.cMsTimeout - cMsElapsed;
542 if cMsLeft <= cMsMin:
543 return cMsMin;
544 if cMsLeft > cMsMax and cMsMax > 0:
545 return cMsMax
546 return cMsLeft;
547
548 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
549 """
550 Wrapper for TransportBase.recvMsg that stashes the response away
551 so the client can inspect it later on.
552 """
553 if cMsTimeout == None:
554 cMsTimeout = self.getMsLeft(500);
555 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
556 self.lockTask();
557 self.t3oReply = (cbMsg, sOpcode, abPayload);
558 self.unlockTask();
559 return (cbMsg, sOpcode, abPayload);
560
561 def recvAck(self, fNoDataOk = False):
562 """
563 Receives an ACK or error response from the TXS.
564
565 Returns True on success.
566 Returns False on timeout or transport error.
567 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
568 and there are always details of some sort or another.
569 """
570 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
571 if cbMsg is None:
572 return False;
573 sOpcode = sOpcode.strip()
574 if sOpcode == "ACK":
575 return True;
576 return (sOpcode, getSZ(abPayload, 0, sOpcode));
577
578 def recvAckLogged(self, sCommand, fNoDataOk = False):
579 """
580 Wrapper for recvAck and logging.
581 Returns True on success (ACK).
582 Returns False on time, transport error and errors signalled by TXS.
583 """
584 rc = self.recvAck(fNoDataOk);
585 if rc is not True and not fNoDataOk:
586 if rc is False:
587 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
588 else:
589 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
590 rc = False;
591 return rc;
592
593 def recvTrueFalse(self, sCommand):
594 """
595 Receives a TRUE/FALSE response from the TXS.
596 Returns True on TRUE, False on FALSE and None on error/other (logged).
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply();
599 if cbMsg is None:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 return None;
602
603 sOpcode = sOpcode.strip()
604 if sOpcode == "TRUE":
605 return True;
606 if sOpcode == "FALSE":
607 return False;
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
609 return None;
610
611 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
612 """
613 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
614 """
615 if cMsTimeout == None:
616 cMsTimeout = self.getMsLeft(500);
617 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
618
619 def asyncToSync(self, fnAsync, *aArgs):
620 """
621 Wraps an asynchronous task into a synchronous operation.
622
623 Returns False on failure, task return status on success.
624 """
625 rc = fnAsync(*aArgs);
626 if rc is False:
627 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
628 return rc;
629
630 rc = self.waitForTask(self.cMsTimeout + 5000);
631 if rc is False:
632 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
633 self.cancelTask();
634 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
635 return False;
636
637 rc = self.getResult();
638 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
639 return rc;
640
641 #
642 # Connection tasks.
643 #
644
645 def taskConnect(self, cMsIdleFudge):
646 """Tries to connect to the TXS"""
647 while not self.isCancelled():
648 reporter.log2('taskConnect: connecting ...');
649 rc = self.oTransport.connect(self.getMsLeft(500));
650 if rc is True:
651 reporter.log('taskConnect: succeeded');
652 return self.taskGreet(cMsIdleFudge);
653 if rc is None:
654 reporter.log2('taskConnect: unable to connect');
655 return None;
656 if self.hasTimedOut():
657 reporter.log2('taskConnect: timed out');
658 if not self.fTryConnect:
659 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
660 return False;
661 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
662 if not self.fTryConnect:
663 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
664 return False;
665
666 def taskGreet(self, cMsIdleFudge):
667 """Greets the TXS"""
668 rc = self.sendMsg("HOWDY", ());
669 if rc is True:
670 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
671 if rc is True:
672 while cMsIdleFudge > 0:
673 cMsIdleFudge -= 1000;
674 time.sleep(1);
675 else:
676 self.oTransport.disconnect(self.fTryConnect);
677 return rc;
678
679 def taskBye(self):
680 """Says goodbye to the TXS"""
681 rc = self.sendMsg("BYE");
682 if rc is True:
683 rc = self.recvAckLogged("BYE");
684 self.oTransport.disconnect();
685 return rc;
686
687 def taskUuid(self):
688 """Gets the TXS UUID"""
689 rc = self.sendMsg("UUID");
690 if rc is True:
691 rc = False;
692 cbMsg, sOpcode, abPayload = self.recvReply();
693 if cbMsg is not None:
694 sOpcode = sOpcode.strip()
695 if sOpcode == "ACK UUID":
696 sUuid = getSZ(abPayload, 0);
697 if sUuid is not None:
698 sUuid = '{%s}' % (sUuid,)
699 try:
700 _ = uuid.UUID(sUuid);
701 rc = sUuid;
702 except:
703 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
704 else:
705 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
706 else:
707 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
708 else:
709 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
710 return rc;
711
712 #
713 # Process task
714 # pylint: disable=C0111
715 #
716
717 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
718 # Construct the payload.
719 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
720 for sArg in asArgs:
721 aoPayload.append('%s' % (sArg));
722 aoPayload.append(long(len(asAddEnv)));
723 for sPutEnv in asAddEnv:
724 aoPayload.append('%s' % (sPutEnv));
725 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
726 if isinstance(o, basestring):
727 aoPayload.append(o);
728 elif o is not None:
729 aoPayload.append('|');
730 o.uTxsClientCrc32 = zlib.crc32('');
731 else:
732 aoPayload.append('');
733 aoPayload.append('%s' % (sAsUser));
734 aoPayload.append(long(self.cMsTimeout));
735
736 # Kick of the EXEC command.
737 rc = self.sendMsg('EXEC', aoPayload)
738 if rc is True:
739 rc = self.recvAckLogged('EXEC');
740 if rc is True:
741 # Loop till the process completes, feed input to the TXS and
742 # receive output from it.
743 sFailure = "";
744 msPendingInputReply = None;
745 cbMsg, sOpcode, abPayload = (None, None, None);
746 while True:
747 # Pending input?
748 if msPendingInputReply is None \
749 and oStdIn is not None \
750 and not isinstance(oStdIn, basestring):
751 try:
752 abInput = oStdIn.read(65536);
753 except:
754 reporter.errorXcpt('read standard in');
755 sFailure = 'exception reading stdin';
756 rc = None;
757 break;
758 if len(abInput) > 0:
759 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
760 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
761 if rc is not True:
762 sFailure = 'sendMsg failure';
763 break;
764 msPendingInputReply = base.timestampMilli();
765 continue;
766
767 # Wait for input (500 ms timeout).
768 if cbMsg is None:
769 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
770 if cbMsg == None:
771 # Check for time out before restarting the loop.
772 # Note! Only doing timeout checking here does mean that
773 # the TXS may prevent us from timing out by
774 # flooding us with data. This is unlikely though.
775 if self.hasTimedOut() \
776 and ( msPendingInputReply is None \
777 or base.timestampMilli() - msPendingInputReply > 30000):
778 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
779 sFailure = 'timeout';
780 rc = None;
781 break;
782 # Check that the connection is OK.
783 if not self.oTransport.isConnectionOk():
784 self.oTransport.disconnect();
785 sFailure = 'disconnected';
786 rc = False;
787 break;
788 continue;
789
790 # Handle the response.
791 sOpcode = sOpcode.rstrip();
792 if sOpcode == 'STDOUT':
793 oOut = oStdOut;
794 elif sOpcode == 'STDERR':
795 oOut = oStdErr;
796 elif sOpcode == 'TESTPIPE':
797 oOut = oTestPipe;
798 else:
799 oOut = None;
800 if oOut is not None:
801 # Output from the process.
802 if len(abPayload) < 4:
803 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
804 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
805 rc = None;
806 break;
807 uStreamCrc32 = getU32(abPayload, 0);
808 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
809 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
810 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
811 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
812 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
813 rc = None;
814 break;
815 try:
816 oOut.write(abPayload[4:]);
817 except:
818 sFailure = 'exception writing %s' % (sOpcode);
819 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
820 rc = None;
821 break;
822 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
823 # Standard input is ignored. Ignore this condition for now.
824 msPendingInputReply = None;
825 reporter.log('taskExecEx: Standard input is ignored... why?');
826 del oStdIn.uTxsClientCrc32;
827 oStdIn = '/dev/null';
828 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
829 and msPendingInputReply is not None:
830 # TXS STDIN error, abort.
831 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
832 msPendingInputReply = None;
833 sFailure = 'TXS is out of memory for std input buffering';
834 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
835 rc = None;
836 break;
837 elif sOpcode == 'ACK' and msPendingInputReply is not None:
838 msPendingInputReply = None;
839 elif sOpcode.startswith('PROC '):
840 # Process status message, handle it outside the loop.
841 rc = True;
842 break;
843 else:
844 sFailure = 'Unexpected opcode %s' % (sOpcode);
845 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
846 rc = None;
847 break;
848 # Clear the message.
849 cbMsg, sOpcode, abPayload = (None, None, None);
850
851 # If we sent an STDIN packet and didn't get a reply yet, we'll give
852 # TXS some 5 seconds to reply to this. If we don't wait here we'll
853 # get screwed later on if we mix it up with the reply to some other
854 # command. Hackish.
855 if msPendingInputReply is not None:
856 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
857 if cbMsg2 is not None:
858 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
859 % (cbMsg2, sOpcode2, abPayload2));
860 msPendingInputReply = None;
861 else:
862 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
863 self.fScrewedUpMsgState = True;
864
865 # Parse the exit status (True), abort (None) or do nothing (False).
866 if rc is True:
867 if sOpcode == 'PROC OK':
868 rc = True;
869 else:
870 # Do proper parsing some other day if needed:
871 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
872 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
873 rc = False;
874 else:
875 if rc is None:
876 # Abort it.
877 reporter.log('taskExecEx: sending ABORT...');
878 rc = self.sendMsg('ABORT');
879 while rc is True:
880 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
881 if cbMsg is None:
882 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
883 self.fScrewedUpMsgState = True;
884 break;
885 if sOpcode.startswith('PROC '):
886 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
887 break;
888 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
889 # Check that the connection is OK before looping.
890 if not self.oTransport.isConnectionOk():
891 self.oTransport.disconnect();
892 break;
893
894 # Fake response with the reason why we quit.
895 if sFailure is not None:
896 self.t3oReply = (0, 'EXECFAIL', sFailure);
897 rc = None;
898 else:
899 rc = None;
900
901 # Cleanup.
902 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
903 if o is not None and not isinstance(o, basestring):
904 del o.uTxsClientCrc32; # pylint: disable=E1103
905 # Make sure all files are closed
906 o.close(); # pylint: disable=E1103
907 reporter.log('taskExecEx: returns %s' % (rc));
908 return rc;
909
910 #
911 # Admin tasks
912 #
913
914 def hlpRebootShutdownWaitForAck(self, sCmd):
915 """Wait for reboot/shutodwn ACK."""
916 rc = self.recvAckLogged(sCmd);
917 if rc is True:
918 # poll a little while for server to disconnect.
919 uMsStart = base.timestampMilli();
920 while self.oTransport.isConnectionOk() \
921 and base.timestampMilli() - uMsStart >= 5000:
922 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
923 break;
924 self.oTransport.disconnect();
925 return rc;
926
927 def taskReboot(self):
928 rc = self.sendMsg('REBOOT');
929 if rc is True:
930 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
931 return rc;
932
933 def taskShutdown(self):
934 rc = self.sendMsg('SHUTDOWN');
935 if rc is True:
936 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
937 return rc;
938
939 #
940 # CD/DVD control tasks.
941 #
942
943 ## TODO
944
945 #
946 # File system tasks
947 #
948
949 def taskMkDir(self, sRemoteDir, fMode):
950 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
951 if rc is True:
952 rc = self.recvAckLogged('MKDIR');
953 return rc;
954
955 def taskMkDirPath(self, sRemoteDir, fMode):
956 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
957 if rc is True:
958 rc = self.recvAckLogged('MKDRPATH');
959 return rc;
960
961 def taskMkSymlink(self, sLinkTarget, sLink):
962 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
963 if rc is True:
964 rc = self.recvAckLogged('MKSYMLNK');
965 return rc;
966
967 def taskRmDir(self, sRemoteDir):
968 rc = self.sendMsg('RMDIR', (sRemoteDir,));
969 if rc is True:
970 rc = self.recvAckLogged('RMDIR');
971 return rc;
972
973 def taskRmFile(self, sRemoteFile):
974 rc = self.sendMsg('RMFILE', (sRemoteFile,));
975 if rc is True:
976 rc = self.recvAckLogged('RMFILE');
977 return rc;
978
979 def taskRmSymlink(self, sRemoteSymlink):
980 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
981 if rc is True:
982 rc = self.recvAckLogged('RMSYMLNK');
983 return rc;
984
985 def taskRmTree(self, sRemoteTree):
986 rc = self.sendMsg('RMTREE', (sRemoteTree,));
987 if rc is True:
988 rc = self.recvAckLogged('RMTREE');
989 return rc;
990
991 #def "CHMOD "
992 #def "CHOWN "
993 #def "CHGRP "
994
995 def taskIsDir(self, sRemoteDir):
996 rc = self.sendMsg('ISDIR', (sRemoteDir,));
997 if rc is True:
998 rc = self.recvTrueFalse('ISDIR');
999 return rc;
1000
1001 def taskIsFile(self, sRemoteFile):
1002 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1003 if rc is True:
1004 rc = self.recvTrueFalse('ISFILE');
1005 return rc;
1006
1007 def taskIsSymlink(self, sRemoteSymlink):
1008 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1009 if rc is True:
1010 rc = self.recvTrueFalse('ISSYMLNK');
1011 return rc;
1012
1013 #def "STAT "
1014 #def "LSTAT "
1015 #def "LIST "
1016
1017 def taskUploadFile(self, sLocalFile, sRemoteFile):
1018 #
1019 # Open the local file (make sure it exist before bothering TXS) and
1020 # tell TXS that we want to upload a file.
1021 #
1022 try:
1023 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1024 except:
1025 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1026 return False;
1027
1028 # Common cause with taskUploadStr
1029 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1030
1031 # Cleanup.
1032 oLocalFile.close();
1033 return rc;
1034
1035 def taskUploadString(self, sContent, sRemoteFile):
1036 # Wrap sContent in a file like class.
1037 class InStringFile(object): # pylint: disable=R0903
1038 def __init__(self, sContent):
1039 self.sContent = sContent;
1040 self.off = 0;
1041
1042 def read(self, cbMax):
1043 cbLeft = len(self.sContent) - self.off;
1044 if cbLeft == 0:
1045 return "";
1046 if cbLeft <= cbMax:
1047 sRet = self.sContent[self.off:(self.off + cbLeft)];
1048 else:
1049 sRet = self.sContent[self.off:(self.off + cbMax)];
1050 self.off = self.off + len(sRet);
1051 return sRet;
1052
1053 oLocalString = InStringFile(sContent);
1054 return self.taskUploadCommon(oLocalString, sRemoteFile);
1055
1056 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1057 """Common worker used by taskUploadFile and taskUploadString."""
1058 # Command + ACK.
1059 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1060 if rc is True:
1061 rc = self.recvAckLogged('PUT FILE');
1062 if rc is True:
1063 #
1064 # Push data packets until eof.
1065 #
1066 uMyCrc32 = zlib.crc32("");
1067 while True:
1068 # Read up to 64 KB of data.
1069 try:
1070 sRaw = oLocalFile.read(65536);
1071 except:
1072 rc = None;
1073 break;
1074
1075 # Convert to array - this is silly!
1076 abBuf = array.array('B');
1077 for i in range(len(sRaw)):
1078 abBuf.append(ord(sRaw[i]));
1079 sRaw = None;
1080
1081 # Update the file stream CRC and send it off.
1082 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1083 if len(abBuf) == 0:
1084 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1085 else:
1086 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1087 if rc is False:
1088 break;
1089
1090 # Wait for the reply.
1091 rc = self.recvAck();
1092 if rc is not True:
1093 if rc is False:
1094 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1095 else:
1096 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1097 rc = False;
1098 break;
1099
1100 # EOF?
1101 if len(abBuf) == 0:
1102 break;
1103
1104 # Send ABORT on ACK and I/O errors.
1105 if rc is None:
1106 rc = self.sendMsg('ABORT');
1107 if rc is True:
1108 self.recvAckLogged('ABORT');
1109 rc = False;
1110 return rc;
1111
1112 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1113 try:
1114 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1115 except:
1116 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1117 return False;
1118
1119 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1120
1121 oLocalFile.close();
1122 if rc is False:
1123 try:
1124 os.remove(sLocalFile);
1125 except:
1126 reporter.errorXcpt();
1127 return rc;
1128
1129 def taskDownloadString(self, sRemoteFile):
1130 # Wrap sContent in a file like class.
1131 class OutStringFile(object): # pylint: disable=R0903
1132 def __init__(self):
1133 self.asContent = [];
1134
1135 def write(self, sBuf):
1136 self.asContent.append(sBuf);
1137 return None;
1138
1139 oLocalString = OutStringFile();
1140 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1141 if rc is True:
1142 if len(oLocalString.asContent) == 0:
1143 rc = '';
1144 else:
1145 rc = ''.join(oLocalString.asContent);
1146 return rc;
1147
1148 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1149 """Common worker for taskDownloadFile and taskDownloadString."""
1150 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1151 if rc is True:
1152 #
1153 # Process data packets until eof.
1154 #
1155 uMyCrc32 = zlib.crc32("");
1156 while rc is True:
1157 cbMsg, sOpcode, abPayload = self.recvReply();
1158 if cbMsg is None:
1159 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1160 rc = None;
1161 break;
1162
1163 # Validate.
1164 sOpcode = sOpcode.rstrip();
1165 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1166 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1167 % (sOpcode, getSZ(abPayload, 0, "None")));
1168 rc = False;
1169 break;
1170 if sOpcode == 'DATA' and len(abPayload) < 4:
1171 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1172 rc = None;
1173 break;
1174 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1175 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1176 rc = None;
1177 break;
1178
1179 # Check the CRC (common for both packets).
1180 uCrc32 = getU32(abPayload, 0);
1181 if sOpcode == 'DATA':
1182 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1183 if uCrc32 != (uMyCrc32 & 0xffffffff):
1184 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1185 % (hex(uMyCrc32), hex(uCrc32)));
1186 rc = None;
1187 break;
1188 if sOpcode == 'DATA EOF':
1189 rc = self.sendMsg('ACK');
1190 break;
1191
1192 # Finally, push the data to the file.
1193 try:
1194 oLocalFile.write(abPayload[4:].tostring());
1195 except:
1196 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1197 rc = None;
1198 break;
1199 rc = self.sendMsg('ACK');
1200
1201 # Send NACK on validation and I/O errors.
1202 if rc is None:
1203 rc = self.sendMsg('NACK');
1204 rc = False;
1205 return rc;
1206
1207 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1208 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1209 if rc is True:
1210 rc = self.recvAckLogged('UNPKFILE');
1211 return rc;
1212
1213 # pylint: enable=C0111
1214
1215
1216 #
1217 # Public methods - generic task queries
1218 #
1219
1220 def isSuccess(self):
1221 """Returns True if the task completed successfully, otherwise False."""
1222 self.lockTask();
1223 sStatus = self.sStatus;
1224 oTaskRc = self.oTaskRc;
1225 self.unlockTask();
1226 if sStatus != "":
1227 return False;
1228 if oTaskRc is False or oTaskRc is None:
1229 return False;
1230 return True;
1231
1232 def getResult(self):
1233 """
1234 Returns the result of a completed task.
1235 Returns None if not completed yet or no previous task.
1236 """
1237 self.lockTask();
1238 sStatus = self.sStatus;
1239 oTaskRc = self.oTaskRc;
1240 self.unlockTask();
1241 if sStatus != "":
1242 return None;
1243 return oTaskRc;
1244
1245 def getLastReply(self):
1246 """
1247 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1248 Returns a None, None, None three-tuple if there was no last reply.
1249 """
1250 self.lockTask();
1251 t3oReply = self.t3oReply;
1252 self.unlockTask();
1253 return t3oReply;
1254
1255 #
1256 # Public methods - connection.
1257 #
1258
1259 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1260 """
1261 Initiates a disconnect task.
1262
1263 Returns True on success, False on failure (logged).
1264
1265 The task returns True on success and False on failure.
1266 """
1267 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1268
1269 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1270 """Synchronous version."""
1271 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1272
1273 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1274 """
1275 Initiates a task for getting the TXS UUID.
1276
1277 Returns True on success, False on failure (logged).
1278
1279 The task returns UUID string (in {}) on success and False on failure.
1280 """
1281 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1282
1283 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1284 """Synchronous version."""
1285 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1286
1287 #
1288 # Public methods - execution.
1289 #
1290
1291 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1292 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1293 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1294 """
1295 Initiates a exec process task.
1296
1297 Returns True on success, False on failure (logged).
1298
1299 The task returns True if the process exited normally with status code 0.
1300 The task returns None if on failure prior to executing the process, and
1301 False if the process exited with a different status or in an abnormal
1302 manner. Both None and False are logged of course and further info can
1303 also be obtained by getLastReply().
1304
1305 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1306 these streams. If None, no special action is taken and the output goes
1307 to where ever the TXS sends its output, and ditto for input.
1308 - To send to / read from the bitbucket, pass '/dev/null'.
1309 - To redirect to/from a file, just specify the remote filename.
1310 - To append to a file use '>>' followed by the remote filename.
1311 - To pipe the stream to/from the TXS, specify a file like
1312 object. For StdIn a non-blocking read() method is required. For
1313 the other a write() method is required. Watch out for deadlock
1314 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1315 """
1316 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1317 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1318 oStdOut, oStdErr, oTestPipe, sAsUser));
1319
1320 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1321 oStdIn = '/dev/null', oStdOut = '/dev/null',
1322 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1323 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1324 """Synchronous version."""
1325 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1326 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1327
1328 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1329 cMsTimeout = 3600000, fIgnoreErrors = False):
1330 """
1331 Initiates a exec process test task.
1332
1333 Returns True on success, False on failure (logged).
1334
1335 The task returns True if the process exited normally with status code 0.
1336 The task returns None if on failure prior to executing the process, and
1337 False if the process exited with a different status or in an abnormal
1338 manner. Both None and False are logged of course and further info can
1339 also be obtained by getLastReply().
1340
1341 Standard in is taken from /dev/null. While both standard output and
1342 standard error goes directly to reporter.log(). The testpipe is piped
1343 to reporter.xxxx.
1344 """
1345
1346 sStdIn = '/dev/null';
1347 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1348 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1349 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1350 else: oTestPipe = '/dev/null';
1351
1352 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1353 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1354
1355 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1356 cMsTimeout = 3600000, fIgnoreErrors = False):
1357 """Synchronous version."""
1358 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1359 cMsTimeout, fIgnoreErrors);
1360
1361 #
1362 # Public methods - file system
1363 #
1364
1365 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1366 """
1367 Initiates a reboot task.
1368
1369 Returns True on success, False on failure (logged).
1370
1371 The task returns True on success, False on failure (logged). The
1372 session will be disconnected on successful task completion.
1373 """
1374 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1375
1376 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1377 """Synchronous version."""
1378 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1379
1380 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1381 """
1382 Initiates a shutdown task.
1383
1384 Returns True on success, False on failure (logged).
1385
1386 The task returns True on success, False on failure (logged).
1387 """
1388 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1389
1390 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1391 """Synchronous version."""
1392 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1393
1394
1395 #
1396 # Public methods - file system
1397 #
1398
1399 def asyncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1400 """
1401 Initiates a mkdir task.
1402
1403 Returns True on success, False on failure (logged).
1404
1405 The task returns True on success, False on failure (logged).
1406 """
1407 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1408
1409 def syncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1410 """Synchronous version."""
1411 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1412
1413 def asyncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1414 """
1415 Initiates a mkdir -p task.
1416
1417 Returns True on success, False on failure (logged).
1418
1419 The task returns True on success, False on failure (logged).
1420 """
1421 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1422
1423 def syncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1424 """Synchronous version."""
1425 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1426
1427 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1428 """
1429 Initiates a symlink task.
1430
1431 Returns True on success, False on failure (logged).
1432
1433 The task returns True on success, False on failure (logged).
1434 """
1435 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1436
1437 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1438 """Synchronous version."""
1439 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1440
1441 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1442 """
1443 Initiates a rmdir task.
1444
1445 Returns True on success, False on failure (logged).
1446
1447 The task returns True on success, False on failure (logged).
1448 """
1449 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1450
1451 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1452 """Synchronous version."""
1453 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1454
1455 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1456 """
1457 Initiates a rmfile task.
1458
1459 Returns True on success, False on failure (logged).
1460
1461 The task returns True on success, False on failure (logged).
1462 """
1463 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1464
1465 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1466 """Synchronous version."""
1467 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1468
1469 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1470 """
1471 Initiates a rmsymlink task.
1472
1473 Returns True on success, False on failure (logged).
1474
1475 The task returns True on success, False on failure (logged).
1476 """
1477 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1478
1479 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1480 """Synchronous version."""
1481 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1482
1483 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1484 """
1485 Initiates a rmtree task.
1486
1487 Returns True on success, False on failure (logged).
1488
1489 The task returns True on success, False on failure (logged).
1490 """
1491 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1492
1493 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1494 """Synchronous version."""
1495 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1496
1497 #def "CHMOD "
1498 #def "CHOWN "
1499 #def "CHGRP "
1500
1501 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1502 """
1503 Initiates a is-dir query task.
1504
1505 Returns True on success, False on failure (logged).
1506
1507 The task returns True if it's a directory, False if it isn't, and
1508 None on error (logged).
1509 """
1510 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1511
1512 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1513 """Synchronous version."""
1514 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1515
1516 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1517 """
1518 Initiates a is-file query task.
1519
1520 Returns True on success, False on failure (logged).
1521
1522 The task returns True if it's a file, False if it isn't, and None on
1523 error (logged).
1524 """
1525 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1526
1527 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1528 """Synchronous version."""
1529 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1530
1531 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1532 """
1533 Initiates a is-symbolic-link query task.
1534
1535 Returns True on success, False on failure (logged).
1536
1537 The task returns True if it's a symbolic linke, False if it isn't, and
1538 None on error (logged).
1539 """
1540 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1541
1542 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1543 """Synchronous version."""
1544 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1545
1546 #def "STAT "
1547 #def "LSTAT "
1548 #def "LIST "
1549
1550 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1551 """
1552 Initiates a download query task.
1553
1554 Returns True on success, False on failure (logged).
1555
1556 The task returns True on success, False on failure (logged).
1557 """
1558 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1559
1560 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1561 """Synchronous version."""
1562 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1563
1564 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1565 """
1566 Initiates a upload string task.
1567
1568 Returns True on success, False on failure (logged).
1569
1570 The task returns True on success, False on failure (logged).
1571 """
1572 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1573
1574 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1575 """Synchronous version."""
1576 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1577
1578 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1579 """
1580 Initiates a download file task.
1581
1582 Returns True on success, False on failure (logged).
1583
1584 The task returns True on success, False on failure (logged).
1585 """
1586 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1587
1588 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1589 """Synchronous version."""
1590 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1591
1592 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1593 """
1594 Initiates a download string task.
1595
1596 Returns True on success, False on failure (logged).
1597
1598 The task returns a byte string on success, False on failure (logged).
1599 """
1600 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1601
1602 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1603 """Synchronous version."""
1604 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1605
1606 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1607 """
1608 Initiates a unpack file task.
1609
1610 Returns True on success, False on failure (logged).
1611
1612 The task returns True on success, False on failure (logged).
1613 """
1614 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1615 (sRemoteFile, sRemoteDir));
1616
1617 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1618 """Synchronous version."""
1619 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1620
1621
1622class TransportTcp(TransportBase):
1623 """
1624 TCP transport layer for the TXS client session class.
1625 """
1626
1627 def __init__(self, sHostname, uPort, fReversedSetup):
1628 """
1629 Save the parameters. The session will call us back to make the
1630 connection later on its worker thread.
1631 """
1632 TransportBase.__init__(self, utils.getCallerName());
1633 self.sHostname = sHostname;
1634 self.fReversedSetup = fReversedSetup;
1635 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1636 self.oSocket = None;
1637 self.oWakeupW = None;
1638 self.oWakeupR = None;
1639 self.fConnectCanceled = False;
1640 self.fIsConnecting = False;
1641 self.oCv = threading.Condition();
1642 self.abReadAhead = array.array('B');
1643
1644 def toString(self):
1645 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1646 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1647 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1648 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1649
1650 def __isInProgressXcpt(self, oXcpt):
1651 """ In progress exception? """
1652 try:
1653 if isinstance(oXcpt, socket.error):
1654 try:
1655 if oXcpt[0] == errno.EINPROGRESS:
1656 return True;
1657 except: pass;
1658 # Windows?
1659 try:
1660 if oXcpt[0] == errno.EWOULDBLOCK:
1661 return True;
1662 except: pass;
1663 except:
1664 pass;
1665 return False;
1666
1667 def __isWouldBlockXcpt(self, oXcpt):
1668 """ Would block exception? """
1669 try:
1670 if isinstance(oXcpt, socket.error):
1671 try:
1672 if oXcpt[0] == errno.EWOULDBLOCK:
1673 return True;
1674 except: pass;
1675 try:
1676 if oXcpt[0] == errno.EAGAIN:
1677 return True;
1678 except: pass;
1679 except:
1680 pass;
1681 return False;
1682
1683 def __isConnectionReset(self, oXcpt):
1684 """ Connection reset by Peer or others. """
1685 try:
1686 if isinstance(oXcpt, socket.error):
1687 try:
1688 if oXcpt[0] == errno.ECONNRESET:
1689 return True;
1690 except: pass;
1691 try:
1692 if oXcpt[0] == errno.ENETRESET:
1693 return True;
1694 except: pass;
1695 except:
1696 pass;
1697 return False;
1698
1699 def _closeWakeupSockets(self):
1700 """ Closes the wakup sockets. Caller should own the CV. """
1701 oWakeupR = self.oWakeupR;
1702 self.oWakeupR = None;
1703 if oWakeupR is not None:
1704 oWakeupR.close();
1705
1706 oWakeupW = self.oWakeupW;
1707 self.oWakeupW = None;
1708 if oWakeupW is not None:
1709 oWakeupW.close();
1710
1711 return None;
1712
1713 def cancelConnect(self):
1714 # This is bad stuff.
1715 self.oCv.acquire();
1716 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1717 self.fConnectCanceled = True;
1718 if self.fIsConnecting:
1719 oSocket = self.oSocket;
1720 self.oSocket = None;
1721 if oSocket is not None:
1722 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1723 oSocket.close();
1724
1725 oWakeupW = self.oWakeupW;
1726 self.oWakeupW = None;
1727 if oWakeupW is not None:
1728 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1729 try: oWakeupW.send('cancelled!\n');
1730 except: reporter.logXcpt();
1731 try: oWakeupW.shutdown(socket.SHUT_WR);
1732 except: reporter.logXcpt();
1733 oWakeupW.close();
1734 self.oCv.release();
1735
1736 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1737 """ Connects to the TXS server as server, i.e. the reversed setup. """
1738 assert(self.fReversedSetup);
1739
1740 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1741
1742 # Workaround for bind() failure...
1743 try:
1744 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1745 except:
1746 reporter.errorXcpt('socket.listen(1) failed');
1747 return None;
1748
1749 # Bind the socket and make it listen.
1750 try:
1751 oSocket.bind((self.sHostname, self.uPort));
1752 except:
1753 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1754 return None;
1755 try:
1756 oSocket.listen(1);
1757 except:
1758 reporter.errorXcpt('socket.listen(1) failed');
1759 return None;
1760
1761 # Accept connections.
1762 oClientSocket = None;
1763 tClientAddr = None;
1764 try:
1765 (oClientSocket, tClientAddr) = oSocket.accept();
1766 except socket.error, e:
1767 if not self.__isInProgressXcpt(e):
1768 raise;
1769
1770 # Do the actual waiting.
1771 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1772 try:
1773 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1774 except socket.error, e:
1775 if e[0] != errno.EBADF or not self.fConnectCanceled:
1776 raise;
1777 reporter.log('socket.select() on accept was canceled');
1778 return None;
1779 except:
1780 reporter.logXcpt('socket.select() on accept');
1781
1782 # Try accept again.
1783 try:
1784 (oClientSocket, tClientAddr) = oSocket.accept();
1785 except socket.error, e:
1786 if not self.__isInProgressXcpt(e):
1787 if e[0] != errno.EBADF or not self.fConnectCanceled:
1788 raise;
1789 reporter.log('socket.accept() was canceled');
1790 return None;
1791 reporter.log('socket.accept() timed out');
1792 return False;
1793 except:
1794 reporter.errorXcpt('socket.accept() failed');
1795 return None;
1796 except:
1797 reporter.errorXcpt('socket.accept() failed');
1798 return None;
1799
1800 # Store the connected socket and throw away the server socket.
1801 self.oCv.acquire();
1802 if not self.fConnectCanceled:
1803 self.oSocket.close();
1804 self.oSocket = oClientSocket;
1805 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1806 self.oCv.release();
1807 return True;
1808
1809 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1810 """ Connects to the TXS server as client. """
1811 assert(not self.fReversedSetup);
1812
1813 # Connect w/ timeouts.
1814 rc = None;
1815 try:
1816 oSocket.connect((self.sHostname, self.uPort));
1817 rc = True;
1818 except socket.error, e:
1819 iRc = e[0];
1820 if self.__isInProgressXcpt(e):
1821 # Do the actual waiting.
1822 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1823 try:
1824 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1825 if len(ttRc[1]) + len(ttRc[2]) == 0:
1826 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1827 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1828 rc = iRc == 0;
1829 except socket.error, e:
1830 iRc = e[0];
1831 except:
1832 iRc = -42;
1833 reporter.fatalXcpt('socket.select() on connect failed');
1834
1835 if rc is True:
1836 pass;
1837 elif iRc == errno.ECONNREFUSED \
1838 or iRc == errno.EHOSTUNREACH \
1839 or iRc == errno.EINTR \
1840 or iRc == errno.ENETDOWN \
1841 or iRc == errno.ENETUNREACH \
1842 or iRc == errno.ETIMEDOUT:
1843 rc = False; # try again.
1844 else:
1845 if iRc != errno.EBADF or not self.fConnectCanceled:
1846 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1847 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1848 except:
1849 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1850 return rc;
1851
1852
1853 def connect(self, cMsTimeout):
1854 # Create a non-blocking socket.
1855 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1856 try:
1857 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1858 except:
1859 reporter.fatalXcpt('socket.socket() failed');
1860 return None;
1861 try:
1862 oSocket.setblocking(0);
1863 except:
1864 oSocket.close();
1865 reporter.fatalXcpt('socket.socket() failed');
1866 return None;
1867
1868 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1869 oWakeupR = None;
1870 oWakeupW = None;
1871 if hasattr(socket, 'socketpair'):
1872 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1873 except: reporter.logXcpt('socket.socketpair() failed');
1874
1875 # Update the state.
1876 self.oCv.acquire();
1877 rc = None;
1878 if not self.fConnectCanceled:
1879 self.oSocket = oSocket;
1880 self.oWakeupW = oWakeupW;
1881 self.oWakeupR = oWakeupR;
1882 self.fIsConnecting = True;
1883 self.oCv.release();
1884
1885 # Try connect.
1886 if oWakeupR is None:
1887 oWakeupR = oSocket; # Avoid select failure.
1888 if self.fReversedSetup:
1889 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1890 else:
1891 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1892 oSocket = None;
1893
1894 # Update the state and cleanup on failure/cancel.
1895 self.oCv.acquire();
1896 if rc is True and self.fConnectCanceled:
1897 rc = False;
1898 self.fIsConnecting = False;
1899
1900 if rc is not True:
1901 if self.oSocket is not None:
1902 self.oSocket.close();
1903 self.oSocket = None;
1904 self._closeWakeupSockets();
1905 self.oCv.release();
1906
1907 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1908 return rc;
1909
1910 def disconnect(self, fQuiet = False):
1911 if self.oSocket is not None:
1912 self.abReadAhead = array.array('B');
1913
1914 # Try a shutting down the socket gracefully (draining it).
1915 try:
1916 self.oSocket.shutdown(socket.SHUT_WR);
1917 except:
1918 if not fQuiet:
1919 reporter.error('shutdown(SHUT_WR)');
1920 try:
1921 self.oSocket.setblocking(0); # just in case it's not set.
1922 sData = "1";
1923 while len(sData) > 0:
1924 sData = self.oSocket.recv(16384);
1925 except:
1926 pass;
1927
1928 # Close it.
1929 self.oCv.acquire();
1930 try: self.oSocket.setblocking(1);
1931 except: pass;
1932 self.oSocket.close();
1933 self.oSocket = None;
1934 else:
1935 self.oCv.acquire();
1936 self._closeWakeupSockets();
1937 self.oCv.release();
1938
1939 def sendBytes(self, abMsg, cMsTimeout):
1940 if self.oSocket is None:
1941 reporter.error('TransportTcp.sendBytes: No connection.');
1942 return False;
1943
1944 # Try send it all.
1945 try:
1946 cbSent = self.oSocket.send(abMsg);
1947 if cbSent == len(abMsg):
1948 return True;
1949 except Exception, oXcpt:
1950 if not self.__isWouldBlockXcpt(oXcpt):
1951 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1952 return False;
1953 cbSent = 0;
1954
1955 # Do a timed send.
1956 msStart = base.timestampMilli();
1957 while True:
1958 cMsElapsed = base.timestampMilli() - msStart;
1959 if cMsElapsed > cMsTimeout:
1960 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abMsg)));
1961 break;
1962
1963 # wait.
1964 try:
1965 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1966 if len(ttRc[2]) > 0 and len(ttRc[1]) == 0:
1967 reporter.error('TranportTcp.sendBytes: select returned with exception');
1968 break;
1969 if len(ttRc[1]) == 0:
1970 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abMsg)));
1971 break;
1972 except:
1973 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1974 break;
1975
1976 # Try send more.
1977 try:
1978 cbSent += self.oSocket.send(abMsg[cbSent:]);
1979 if cbSent == len(abMsg):
1980 return True;
1981 except Exception, oXcpt:
1982 if not self.__isWouldBlockXcpt(oXcpt):
1983 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1984 break;
1985
1986 return False;
1987
1988 def __returnReadAheadBytes(self, cb):
1989 """ Internal worker for recvBytes. """
1990 assert(len(self.abReadAhead) >= cb);
1991 abRet = self.abReadAhead[:cb];
1992 self.abReadAhead = self.abReadAhead[cb:];
1993 return abRet;
1994
1995 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
1996 if self.oSocket is None:
1997 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
1998 return None;
1999
2000 # Try read in some more data without bothering with timeout handling first.
2001 if len(self.abReadAhead) < cb:
2002 try:
2003 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2004 if len(abBuf) > 0:
2005 self.abReadAhead.extend(array.array('B', abBuf));
2006 except Exception, oXcpt:
2007 if not self.__isWouldBlockXcpt(oXcpt):
2008 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2009 return None;
2010
2011 if len(self.abReadAhead) >= cb:
2012 return self.__returnReadAheadBytes(cb);
2013
2014 # Timeout loop.
2015 msStart = base.timestampMilli();
2016 while True:
2017 cMsElapsed = base.timestampMilli() - msStart;
2018 if cMsElapsed > cMsTimeout:
2019 if not fNoDataOk or len(self.abReadAhead) > 0:
2020 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2021 break;
2022
2023 # Wait.
2024 try:
2025 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2026 if len(ttRc[2]) > 0 and len(ttRc[0]) == 0:
2027 reporter.error('TranportTcp.recvBytes: select returned with exception');
2028 break;
2029 if len(ttRc[0]) == 0:
2030 if not fNoDataOk or len(self.abReadAhead) > 0:
2031 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2032 % (len(self.abReadAhead), cb, fNoDataOk));
2033 break;
2034 except:
2035 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2036 break;
2037
2038 # Try read more.
2039 try:
2040 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2041 if len(abBuf) == 0:
2042 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2043 % (len(self.abReadAhead), cb, fNoDataOk));
2044 self.disconnect();
2045 return None;
2046
2047 self.abReadAhead.extend(array.array('B', abBuf));
2048
2049 except Exception, oXcpt:
2050 reporter.log('recv => exception %s' % (oXcpt,));
2051 if not self.__isWouldBlockXcpt(oXcpt):
2052 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or len(self.abReadAhead) > 0:
2053 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2054 break;
2055
2056 # Done?
2057 if len(self.abReadAhead) >= cb:
2058 return self.__returnReadAheadBytes(cb);
2059
2060 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2061 return None;
2062
2063 def isConnectionOk(self):
2064 if self.oSocket is None:
2065 return False;
2066 try:
2067 ttRc = select.select([], [], [self.oSocket], 0.0);
2068 if len(ttRc[2]) > 0:
2069 return False;
2070
2071 self.oSocket.send(array.array('B')); # send zero bytes.
2072 except:
2073 return False;
2074 return True;
2075
2076 def isRecvPending(self, cMsTimeout = 0):
2077 try:
2078 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2079 if len(ttRc[0]) == 0:
2080 return False;
2081 except:
2082 pass;
2083 return True;
2084
2085
2086def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2087 """
2088 Opens a connection to a Test Execution Service via TCP, given its name.
2089 """
2090 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2091 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2092 try:
2093 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2094 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2095 except:
2096 reporter.errorXcpt(None, 15);
2097 return None;
2098 return oSession;
2099
2100
2101def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2102 """
2103 Tries to open a connection to a Test Execution Service via TCP, given its name.
2104
2105 This differs from openTcpSession in that it won't log a connection failure
2106 as an error.
2107 """
2108 try:
2109 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2110 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2111 except:
2112 reporter.errorXcpt(None, 15);
2113 return None;
2114 return oSession;
2115
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette