VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 68236

Last change on this file since 68236 was 65967, checked in by vboxsync, 8 years ago

ValidationKit/testdriver: pylint 2.0.0 fixes.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 79.2 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 65967 2017-03-07 10:54:16Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2016 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 65967 $"
30
31# Standard Python imports.
32import array
33import errno
34import os
35import select
36import socket
37import threading
38import time
39import types
40import zlib
41import uuid
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49#
50# Helpers for decoding data received from the TXS.
51# These are used both the Session and Transport classes.
52#
53
54def getU32(abData, off):
55 """Get a U32 field."""
56 return abData[off] \
57 + abData[off + 1] * 256 \
58 + abData[off + 2] * 65536 \
59 + abData[off + 3] * 16777216;
60
61def getSZ(abData, off, sDefault = None):
62 """
63 Get a zero-terminated string field.
64 Returns sDefault if the string is invalid.
65 """
66 cchStr = getSZLen(abData, off);
67 if cchStr >= 0:
68 abStr = abData[off:(off + cchStr)];
69 try:
70 return abStr.tostring().decode('utf_8');
71 except:
72 reporter.errorXcpt('getSZ(,%u)' % (off));
73 return sDefault;
74
75def getSZLen(abData, off):
76 """
77 Get the length of a zero-terminated string field, in bytes.
78 Returns -1 if off is beyond the data packet or not properly terminated.
79 """
80 cbData = len(abData);
81 if off >= cbData:
82 return -1;
83
84 offCur = off;
85 while abData[offCur] != 0:
86 offCur = offCur + 1;
87 if offCur >= cbData:
88 return -1;
89
90 return offCur - off;
91
92def isValidOpcodeEncoding(sOpcode):
93 """
94 Checks if the specified opcode is valid or not.
95 Returns True on success.
96 Returns False if it is invalid, details in the log.
97 """
98 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
100 if len(sOpcode) != 8:
101 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
102 return False;
103 for i in range(0, 1):
104 if sSet1.find(sOpcode[i]) < 0:
105 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
106 return False;
107 for i in range(2, 7):
108 if sSet2.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 return True;
112
113#
114# Helper for encoding data sent to the TXS.
115#
116
117def u32ToByteArray(u32):
118 """Encodes the u32 value as a little endian byte (B) array."""
119 return array.array('B', \
120 ( u32 % 256, \
121 (u32 / 256) % 256, \
122 (u32 / 65536) % 256, \
123 (u32 / 16777216) % 256) );
124
125
126
127class TransportBase(object):
128 """
129 Base class for the transport layer.
130 """
131
132 def __init__(self, sCaller):
133 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
134 self.fDummy = 0;
135 self.abReadAheadHdr = array.array('B');
136
137 def toString(self):
138 """
139 Stringify the instance for logging and debugging.
140 """
141 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
142
143 def __str__(self):
144 return self.toString();
145
146 def cancelConnect(self):
147 """
148 Cancels any pending connect() call.
149 Returns None;
150 """
151 return None;
152
153 def connect(self, cMsTimeout):
154 """
155 Quietly attempts to connect to the TXS.
156
157 Returns True on success.
158 Returns False on retryable errors (no logging).
159 Returns None on fatal errors with details in the log.
160
161 Override this method, don't call super.
162 """
163 _ = cMsTimeout;
164 return False;
165
166 def disconnect(self, fQuiet = False):
167 """
168 Disconnect from the TXS.
169
170 Returns True.
171
172 Override this method, don't call super.
173 """
174 _ = fQuiet;
175 return True;
176
177 def sendBytes(self, abBuf, cMsTimeout):
178 """
179 Sends the bytes in the buffer abBuf to the TXS.
180
181 Returns True on success.
182 Returns False on failure and error details in the log.
183
184 Override this method, don't call super.
185
186 Remarks: len(abBuf) is always a multiple of 16.
187 """
188 _ = abBuf; _ = cMsTimeout;
189 return False;
190
191 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
192 """
193 Receive cb number of bytes from the TXS.
194
195 Returns the bytes (array('B')) on success.
196 Returns None on failure and error details in the log.
197
198 Override this method, don't call super.
199
200 Remarks: cb is always a multiple of 16.
201 """
202 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
203 return None;
204
205 def isConnectionOk(self):
206 """
207 Checks if the connection is OK.
208
209 Returns True if it is.
210 Returns False if it isn't (caller should call diconnect).
211
212 Override this method, don't call super.
213 """
214 return True;
215
216 def isRecvPending(self, cMsTimeout = 0):
217 """
218 Checks if there is incoming bytes, optionally waiting cMsTimeout
219 milliseconds for something to arrive.
220
221 Returns True if there is, False if there isn't.
222
223 Override this method, don't call super.
224 """
225 _ = cMsTimeout;
226 return False;
227
228 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
229 """
230 Sends a message (opcode + encoded payload).
231
232 Returns True on success.
233 Returns False on failure and error details in the log.
234 """
235 # Fix + check the opcode.
236 if len(sOpcode) < 2:
237 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
238 return False;
239 sOpcode = sOpcode.ljust(8);
240 if not isValidOpcodeEncoding(sOpcode):
241 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
242 return False;
243
244 # Start construct the message.
245 cbMsg = 16 + len(abPayload);
246 abMsg = array.array('B');
247 abMsg.extend(u32ToByteArray(cbMsg));
248 abMsg.extend((0, 0, 0, 0)); # uCrc32
249 try:
250 abMsg.extend(array.array('B', \
251 ( ord(sOpcode[0]), \
252 ord(sOpcode[1]), \
253 ord(sOpcode[2]), \
254 ord(sOpcode[3]), \
255 ord(sOpcode[4]), \
256 ord(sOpcode[5]), \
257 ord(sOpcode[6]), \
258 ord(sOpcode[7]) ) ) );
259 if abPayload:
260 abMsg.extend(abPayload);
261 except:
262 reporter.fatalXcpt('sendMsgInt: packing problem...');
263 return False;
264
265 # checksum it, padd it and send it off.
266 uCrc32 = zlib.crc32(abMsg[8:]);
267 abMsg[4:8] = u32ToByteArray(uCrc32);
268
269 while len(abMsg) % 16:
270 abMsg.append(0);
271
272 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
273 return self.sendBytes(abMsg, cMsTimeout);
274
275 def recvMsg(self, cMsTimeout, fNoDataOk = False):
276 """
277 Receives a message from the TXS.
278
279 Returns the message three-tuple: length, opcode, payload.
280 Returns (None, None, None) on failure and error details in the log.
281 """
282
283 # Read the header.
284 if self.abReadAheadHdr:
285 assert(len(self.abReadAheadHdr) == 16);
286 abHdr = self.abReadAheadHdr;
287 self.abReadAheadHdr = array.array('B');
288 else:
289 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
290 if abHdr is None:
291 return (None, None, None);
292 if len(abHdr) != 16:
293 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
294 return (None, None, None);
295
296 # Unpack and validate the header.
297 cbMsg = getU32(abHdr, 0);
298 uCrc32 = getU32(abHdr, 4);
299 sOpcode = abHdr[8:16].tostring().decode('ascii');
300
301 if cbMsg < 16:
302 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
303 return (None, None, None);
304 if cbMsg > 1024*1024:
305 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
306 return (None, None, None);
307 if not isValidOpcodeEncoding(sOpcode):
308 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
309 return (None, None, None);
310
311 # Get the payload (if any), dropping the padding.
312 abPayload = array.array('B');
313 if cbMsg > 16:
314 if cbMsg % 16:
315 cbPadding = 16 - (cbMsg % 16);
316 else:
317 cbPadding = 0;
318 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
319 if abPayload is None:
320 self.abReadAheadHdr = abHdr;
321 if not fNoDataOk :
322 reporter.log('recvMsg: failed to recv payload bytes!');
323 return (None, None, None);
324
325 while cbPadding > 0:
326 abPayload.pop();
327 cbPadding = cbPadding - 1;
328
329 # Check the CRC-32.
330 if uCrc32 != 0:
331 uActualCrc32 = zlib.crc32(abHdr[8:]);
332 if cbMsg > 16:
333 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
334 uActualCrc32 = uActualCrc32 & 0xffffffff;
335 if uCrc32 != uActualCrc32:
336 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
337 return (None, None, None);
338
339 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
340 return (cbMsg, sOpcode, abPayload);
341
342 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
343 """
344 Sends a message (opcode + payload tuple).
345
346 Returns True on success.
347 Returns False on failure and error details in the log.
348 Returns None if you pass the incorrectly typed parameters.
349 """
350 # Encode the payload.
351 abPayload = array.array('B');
352 for o in aoPayload:
353 try:
354 if isinstance(o, basestring):
355 # the primitive approach...
356 sUtf8 = o.encode('utf_8');
357 for i in range(0, len(sUtf8)):
358 abPayload.append(ord(sUtf8[i]))
359 abPayload.append(0);
360 elif isinstance(o, types.LongType):
361 if o < 0 or o > 0xffffffff:
362 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
363 return None;
364 abPayload.extend(u32ToByteArray(o));
365 elif isinstance(o, array.array):
366 abPayload.extend(o);
367 else:
368 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
369 return None;
370 except:
371 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
372 return None;
373 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
374
375
376class Session(TdTaskBase):
377 """
378 A Test eXecution Service (TXS) client session.
379 """
380
381 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
382 """
383 Construct a TXS session.
384
385 This starts by connecting to the TXS and will enter the signalled state
386 when connected or the timeout has been reached.
387 """
388 TdTaskBase.__init__(self, utils.getCallerName());
389 self.oTransport = oTransport;
390 self.sStatus = "";
391 self.cMsTimeout = 0;
392 self.fErr = True; # Whether to report errors as error.
393 self.msStart = 0;
394 self.oThread = None;
395 self.fnTask = self.taskDummy;
396 self.aTaskArgs = None;
397 self.oTaskRc = None;
398 self.t3oReply = (None, None, None);
399 self.fScrewedUpMsgState = False;
400 self.fTryConnect = fTryConnect;
401
402 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
403 raise base.GenError("startTask failed");
404
405 def __del__(self):
406 """Make sure to cancel the task when deleted."""
407 self.cancelTask();
408
409 def toString(self):
410 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
411 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
412 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
413 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
414
415 def taskDummy(self):
416 """Place holder to catch broken state handling."""
417 raise Exception();
418
419 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
420 """
421 Kicks of a new task.
422
423 cMsTimeout: The task timeout in milliseconds. Values less than
424 500 ms will be adjusted to 500 ms. This means it is
425 OK to use negative value.
426 sStatus: The task status.
427 fnTask: The method that'll execute the task.
428 aArgs: Arguments to pass to fnTask.
429
430 Returns True on success, False + error in log on failure.
431 """
432 if not self.cancelTask():
433 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
434 return False;
435
436 # Change status and make sure we're the
437 self.lockTask();
438 if self.sStatus != "":
439 self.unlockTask();
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
441 return False;
442 self.sStatus = "setup";
443 self.oTaskRc = None;
444 self.t3oReply = (None, None, None);
445 self.resetTaskLocked();
446 self.unlockTask();
447
448 self.cMsTimeout = max(cMsTimeout, 500);
449 self.fErr = not fIgnoreErrors;
450 self.fnTask = fnTask;
451 self.aTaskArgs = aArgs;
452 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
453 self.oThread.setDaemon(True);
454 self.msStart = base.timestampMilli();
455
456 self.lockTask();
457 self.sStatus = sStatus;
458 self.unlockTask();
459 self.oThread.start();
460
461 return True;
462
463 def cancelTask(self, fSync = True):
464 """
465 Attempts to cancel any pending tasks.
466 Returns success indicator (True/False).
467 """
468 self.lockTask();
469
470 if self.sStatus == "":
471 self.unlockTask();
472 return True;
473 if self.sStatus == "setup":
474 self.unlockTask();
475 return False;
476 if self.sStatus == "cancelled":
477 self.unlockTask();
478 return False;
479
480 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
481 if self.sStatus == 'connecting':
482 self.oTransport.cancelConnect();
483
484 self.sStatus = "cancelled";
485 oThread = self.oThread;
486 self.unlockTask();
487
488 if not fSync:
489 return False;
490
491 oThread.join(61.0);
492 return oThread.isAlive();
493
494 def taskThread(self):
495 """
496 The task thread function.
497 This does some housekeeping activities around the real task method call.
498 """
499 if not self.isCancelled():
500 try:
501 fnTask = self.fnTask;
502 oTaskRc = fnTask(*self.aTaskArgs);
503 except:
504 reporter.fatalXcpt('taskThread', 15);
505 oTaskRc = None;
506 else:
507 reporter.log('taskThread: cancelled already');
508
509 self.lockTask();
510
511 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
512 self.oTaskRc = oTaskRc;
513 self.oThread = None;
514 self.sStatus = '';
515 self.signalTaskLocked();
516
517 self.unlockTask();
518 return None;
519
520 def isCancelled(self):
521 """Internal method for checking if the task has been cancelled."""
522 self.lockTask();
523 sStatus = self.sStatus;
524 self.unlockTask();
525 if sStatus == "cancelled":
526 return True;
527 return False;
528
529 def hasTimedOut(self):
530 """Internal method for checking if the task has timed out or not."""
531 cMsLeft = self.getMsLeft();
532 if cMsLeft <= 0:
533 return True;
534 return False;
535
536 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
537 """Gets the time left until the timeout."""
538 cMsElapsed = base.timestampMilli() - self.msStart;
539 if cMsElapsed < 0:
540 return cMsMin;
541 cMsLeft = self.cMsTimeout - cMsElapsed;
542 if cMsLeft <= cMsMin:
543 return cMsMin;
544 if cMsLeft > cMsMax and cMsMax > 0:
545 return cMsMax
546 return cMsLeft;
547
548 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
549 """
550 Wrapper for TransportBase.recvMsg that stashes the response away
551 so the client can inspect it later on.
552 """
553 if cMsTimeout is None:
554 cMsTimeout = self.getMsLeft(500);
555 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
556 self.lockTask();
557 self.t3oReply = (cbMsg, sOpcode, abPayload);
558 self.unlockTask();
559 return (cbMsg, sOpcode, abPayload);
560
561 def recvAck(self, fNoDataOk = False):
562 """
563 Receives an ACK or error response from the TXS.
564
565 Returns True on success.
566 Returns False on timeout or transport error.
567 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
568 and there are always details of some sort or another.
569 """
570 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
571 if cbMsg is None:
572 return False;
573 sOpcode = sOpcode.strip()
574 if sOpcode == "ACK":
575 return True;
576 return (sOpcode, getSZ(abPayload, 0, sOpcode));
577
578 def recvAckLogged(self, sCommand, fNoDataOk = False):
579 """
580 Wrapper for recvAck and logging.
581 Returns True on success (ACK).
582 Returns False on time, transport error and errors signalled by TXS.
583 """
584 rc = self.recvAck(fNoDataOk);
585 if rc is not True and not fNoDataOk:
586 if rc is False:
587 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
588 else:
589 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
590 rc = False;
591 return rc;
592
593 def recvTrueFalse(self, sCommand):
594 """
595 Receives a TRUE/FALSE response from the TXS.
596 Returns True on TRUE, False on FALSE and None on error/other (logged).
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply();
599 if cbMsg is None:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 return None;
602
603 sOpcode = sOpcode.strip()
604 if sOpcode == "TRUE":
605 return True;
606 if sOpcode == "FALSE":
607 return False;
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
609 return None;
610
611 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
612 """
613 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
614 """
615 if cMsTimeout is None:
616 cMsTimeout = self.getMsLeft(500);
617 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
618
619 def asyncToSync(self, fnAsync, *aArgs):
620 """
621 Wraps an asynchronous task into a synchronous operation.
622
623 Returns False on failure, task return status on success.
624 """
625 rc = fnAsync(*aArgs);
626 if rc is False:
627 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
628 return rc;
629
630 rc = self.waitForTask(self.cMsTimeout + 5000);
631 if rc is False:
632 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
633 self.cancelTask();
634 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
635 return False;
636
637 rc = self.getResult();
638 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
639 return rc;
640
641 #
642 # Connection tasks.
643 #
644
645 def taskConnect(self, cMsIdleFudge):
646 """Tries to connect to the TXS"""
647 while not self.isCancelled():
648 reporter.log2('taskConnect: connecting ...');
649 rc = self.oTransport.connect(self.getMsLeft(500));
650 if rc is True:
651 reporter.log('taskConnect: succeeded');
652 return self.taskGreet(cMsIdleFudge);
653 if rc is None:
654 reporter.log2('taskConnect: unable to connect');
655 return None;
656 if self.hasTimedOut():
657 reporter.log2('taskConnect: timed out');
658 if not self.fTryConnect:
659 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
660 return False;
661 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
662 if not self.fTryConnect:
663 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
664 return False;
665
666 def taskGreet(self, cMsIdleFudge):
667 """Greets the TXS"""
668 rc = self.sendMsg("HOWDY", ());
669 if rc is True:
670 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
671 if rc is True:
672 while cMsIdleFudge > 0:
673 cMsIdleFudge -= 1000;
674 time.sleep(1);
675 else:
676 self.oTransport.disconnect(self.fTryConnect);
677 return rc;
678
679 def taskBye(self):
680 """Says goodbye to the TXS"""
681 rc = self.sendMsg("BYE");
682 if rc is True:
683 rc = self.recvAckLogged("BYE");
684 self.oTransport.disconnect();
685 return rc;
686
687 def taskUuid(self):
688 """Gets the TXS UUID"""
689 rc = self.sendMsg("UUID");
690 if rc is True:
691 rc = False;
692 cbMsg, sOpcode, abPayload = self.recvReply();
693 if cbMsg is not None:
694 sOpcode = sOpcode.strip()
695 if sOpcode == "ACK UUID":
696 sUuid = getSZ(abPayload, 0);
697 if sUuid is not None:
698 sUuid = '{%s}' % (sUuid,)
699 try:
700 _ = uuid.UUID(sUuid);
701 rc = sUuid;
702 except:
703 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
704 else:
705 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
706 else:
707 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
708 else:
709 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
710 return rc;
711
712 #
713 # Process task
714 # pylint: disable=C0111
715 #
716
717 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
718 # Construct the payload.
719 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
720 for sArg in asArgs:
721 aoPayload.append('%s' % (sArg));
722 aoPayload.append(long(len(asAddEnv)));
723 for sPutEnv in asAddEnv:
724 aoPayload.append('%s' % (sPutEnv));
725 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
726 if isinstance(o, basestring):
727 aoPayload.append(o);
728 elif o is not None:
729 aoPayload.append('|');
730 o.uTxsClientCrc32 = zlib.crc32('');
731 else:
732 aoPayload.append('');
733 aoPayload.append('%s' % (sAsUser));
734 aoPayload.append(long(self.cMsTimeout));
735
736 # Kick of the EXEC command.
737 rc = self.sendMsg('EXEC', aoPayload)
738 if rc is True:
739 rc = self.recvAckLogged('EXEC');
740 if rc is True:
741 # Loop till the process completes, feed input to the TXS and
742 # receive output from it.
743 sFailure = "";
744 msPendingInputReply = None;
745 cbMsg, sOpcode, abPayload = (None, None, None);
746 while True:
747 # Pending input?
748 if msPendingInputReply is None \
749 and oStdIn is not None \
750 and not isinstance(oStdIn, basestring):
751 try:
752 sInput = oStdIn.read(65536);
753 except:
754 reporter.errorXcpt('read standard in');
755 sFailure = 'exception reading stdin';
756 rc = None;
757 break;
758 if sInput:
759 oStdIn.uTxsClientCrc32 = zlib.crc32(sInput, oStdIn.uTxsClientCrc32);
760 # Convert to a byte array before handing it of to sendMsg or the string
761 # will get some zero termination added breaking the CRC (and injecting
762 # unwanted bytes).
763 abInput = array.array('B', sInput);
764 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
765 if rc is not True:
766 sFailure = 'sendMsg failure';
767 break;
768 msPendingInputReply = base.timestampMilli();
769 continue;
770
771 rc = self.sendMsg('STDINEOS');
772 oStdIn = None;
773 if rc is not True:
774 sFailure = 'sendMsg failure';
775 break;
776 msPendingInputReply = base.timestampMilli();
777
778 # Wait for input (500 ms timeout).
779 if cbMsg is None:
780 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
781 if cbMsg is None:
782 # Check for time out before restarting the loop.
783 # Note! Only doing timeout checking here does mean that
784 # the TXS may prevent us from timing out by
785 # flooding us with data. This is unlikely though.
786 if self.hasTimedOut() \
787 and ( msPendingInputReply is None \
788 or base.timestampMilli() - msPendingInputReply > 30000):
789 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
790 sFailure = 'timeout';
791 rc = None;
792 break;
793 # Check that the connection is OK.
794 if not self.oTransport.isConnectionOk():
795 self.oTransport.disconnect();
796 sFailure = 'disconnected';
797 rc = False;
798 break;
799 continue;
800
801 # Handle the response.
802 sOpcode = sOpcode.rstrip();
803 if sOpcode == 'STDOUT':
804 oOut = oStdOut;
805 elif sOpcode == 'STDERR':
806 oOut = oStdErr;
807 elif sOpcode == 'TESTPIPE':
808 oOut = oTestPipe;
809 else:
810 oOut = None;
811 if oOut is not None:
812 # Output from the process.
813 if len(abPayload) < 4:
814 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
815 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
816 rc = None;
817 break;
818 uStreamCrc32 = getU32(abPayload, 0);
819 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
820 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
821 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
822 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
823 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
824 rc = None;
825 break;
826 try:
827 oOut.write(abPayload[4:]);
828 except:
829 sFailure = 'exception writing %s' % (sOpcode);
830 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
831 rc = None;
832 break;
833 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
834 # Standard input is ignored. Ignore this condition for now.
835 msPendingInputReply = None;
836 reporter.log('taskExecEx: Standard input is ignored... why?');
837 del oStdIn.uTxsClientCrc32;
838 oStdIn = '/dev/null';
839 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
840 and msPendingInputReply is not None:
841 # TXS STDIN error, abort.
842 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
843 msPendingInputReply = None;
844 sFailure = 'TXS is out of memory for std input buffering';
845 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
846 rc = None;
847 break;
848 elif sOpcode == 'ACK' and msPendingInputReply is not None:
849 msPendingInputReply = None;
850 elif sOpcode.startswith('PROC '):
851 # Process status message, handle it outside the loop.
852 rc = True;
853 break;
854 else:
855 sFailure = 'Unexpected opcode %s' % (sOpcode);
856 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
857 rc = None;
858 break;
859 # Clear the message.
860 cbMsg, sOpcode, abPayload = (None, None, None);
861
862 # If we sent an STDIN packet and didn't get a reply yet, we'll give
863 # TXS some 5 seconds to reply to this. If we don't wait here we'll
864 # get screwed later on if we mix it up with the reply to some other
865 # command. Hackish.
866 if msPendingInputReply is not None:
867 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
868 if cbMsg2 is not None:
869 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
870 % (cbMsg2, sOpcode2, abPayload2));
871 msPendingInputReply = None;
872 else:
873 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
874 self.fScrewedUpMsgState = True;
875
876 # Parse the exit status (True), abort (None) or do nothing (False).
877 if rc is True:
878 if sOpcode != 'PROC OK':
879 # Do proper parsing some other day if needed:
880 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
881 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
882 rc = False;
883 else:
884 if rc is None:
885 # Abort it.
886 reporter.log('taskExecEx: sending ABORT...');
887 rc = self.sendMsg('ABORT');
888 while rc is True:
889 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
890 if cbMsg is None:
891 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
892 self.fScrewedUpMsgState = True;
893 break;
894 if sOpcode.startswith('PROC '):
895 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
896 break;
897 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
898 # Check that the connection is OK before looping.
899 if not self.oTransport.isConnectionOk():
900 self.oTransport.disconnect();
901 break;
902
903 # Fake response with the reason why we quit.
904 if sFailure is not None:
905 self.t3oReply = (0, 'EXECFAIL', sFailure);
906 rc = None;
907 else:
908 rc = None;
909
910 # Cleanup.
911 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
912 if o is not None and not isinstance(o, basestring):
913 del o.uTxsClientCrc32; # pylint: disable=E1103
914 # Make sure all files are closed
915 o.close(); # pylint: disable=E1103
916 reporter.log('taskExecEx: returns %s' % (rc));
917 return rc;
918
919 #
920 # Admin tasks
921 #
922
923 def hlpRebootShutdownWaitForAck(self, sCmd):
924 """Wait for reboot/shutodwn ACK."""
925 rc = self.recvAckLogged(sCmd);
926 if rc is True:
927 # poll a little while for server to disconnect.
928 uMsStart = base.timestampMilli();
929 while self.oTransport.isConnectionOk() \
930 and base.timestampMilli() - uMsStart >= 5000:
931 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
932 break;
933 self.oTransport.disconnect();
934 return rc;
935
936 def taskReboot(self):
937 rc = self.sendMsg('REBOOT');
938 if rc is True:
939 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
940 return rc;
941
942 def taskShutdown(self):
943 rc = self.sendMsg('SHUTDOWN');
944 if rc is True:
945 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
946 return rc;
947
948 #
949 # CD/DVD control tasks.
950 #
951
952 ## TODO
953
954 #
955 # File system tasks
956 #
957
958 def taskMkDir(self, sRemoteDir, fMode):
959 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
960 if rc is True:
961 rc = self.recvAckLogged('MKDIR');
962 return rc;
963
964 def taskMkDirPath(self, sRemoteDir, fMode):
965 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
966 if rc is True:
967 rc = self.recvAckLogged('MKDRPATH');
968 return rc;
969
970 def taskMkSymlink(self, sLinkTarget, sLink):
971 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
972 if rc is True:
973 rc = self.recvAckLogged('MKSYMLNK');
974 return rc;
975
976 def taskRmDir(self, sRemoteDir):
977 rc = self.sendMsg('RMDIR', (sRemoteDir,));
978 if rc is True:
979 rc = self.recvAckLogged('RMDIR');
980 return rc;
981
982 def taskRmFile(self, sRemoteFile):
983 rc = self.sendMsg('RMFILE', (sRemoteFile,));
984 if rc is True:
985 rc = self.recvAckLogged('RMFILE');
986 return rc;
987
988 def taskRmSymlink(self, sRemoteSymlink):
989 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
990 if rc is True:
991 rc = self.recvAckLogged('RMSYMLNK');
992 return rc;
993
994 def taskRmTree(self, sRemoteTree):
995 rc = self.sendMsg('RMTREE', (sRemoteTree,));
996 if rc is True:
997 rc = self.recvAckLogged('RMTREE');
998 return rc;
999
1000 #def "CHMOD "
1001 #def "CHOWN "
1002 #def "CHGRP "
1003
1004 def taskIsDir(self, sRemoteDir):
1005 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1006 if rc is True:
1007 rc = self.recvTrueFalse('ISDIR');
1008 return rc;
1009
1010 def taskIsFile(self, sRemoteFile):
1011 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1012 if rc is True:
1013 rc = self.recvTrueFalse('ISFILE');
1014 return rc;
1015
1016 def taskIsSymlink(self, sRemoteSymlink):
1017 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1018 if rc is True:
1019 rc = self.recvTrueFalse('ISSYMLNK');
1020 return rc;
1021
1022 #def "STAT "
1023 #def "LSTAT "
1024 #def "LIST "
1025
1026 def taskUploadFile(self, sLocalFile, sRemoteFile):
1027 #
1028 # Open the local file (make sure it exist before bothering TXS) and
1029 # tell TXS that we want to upload a file.
1030 #
1031 try:
1032 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1033 except:
1034 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1035 return False;
1036
1037 # Common cause with taskUploadStr
1038 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1039
1040 # Cleanup.
1041 oLocalFile.close();
1042 return rc;
1043
1044 def taskUploadString(self, sContent, sRemoteFile):
1045 # Wrap sContent in a file like class.
1046 class InStringFile(object): # pylint: disable=R0903
1047 def __init__(self, sContent):
1048 self.sContent = sContent;
1049 self.off = 0;
1050
1051 def read(self, cbMax):
1052 cbLeft = len(self.sContent) - self.off;
1053 if cbLeft == 0:
1054 return "";
1055 if cbLeft <= cbMax:
1056 sRet = self.sContent[self.off:(self.off + cbLeft)];
1057 else:
1058 sRet = self.sContent[self.off:(self.off + cbMax)];
1059 self.off = self.off + len(sRet);
1060 return sRet;
1061
1062 oLocalString = InStringFile(sContent);
1063 return self.taskUploadCommon(oLocalString, sRemoteFile);
1064
1065 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1066 """Common worker used by taskUploadFile and taskUploadString."""
1067 # Command + ACK.
1068 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1069 if rc is True:
1070 rc = self.recvAckLogged('PUT FILE');
1071 if rc is True:
1072 #
1073 # Push data packets until eof.
1074 #
1075 uMyCrc32 = zlib.crc32("");
1076 while True:
1077 # Read up to 64 KB of data.
1078 try:
1079 sRaw = oLocalFile.read(65536);
1080 except:
1081 rc = None;
1082 break;
1083
1084 # Convert to array - this is silly!
1085 abBuf = array.array('B');
1086 for i, _ in enumerate(sRaw):
1087 abBuf.append(ord(sRaw[i]));
1088 sRaw = None;
1089
1090 # Update the file stream CRC and send it off.
1091 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1092 if not abBuf:
1093 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1094 else:
1095 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1096 if rc is False:
1097 break;
1098
1099 # Wait for the reply.
1100 rc = self.recvAck();
1101 if rc is not True:
1102 if rc is False:
1103 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1104 else:
1105 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1106 rc = False;
1107 break;
1108
1109 # EOF?
1110 if not abBuf:
1111 break;
1112
1113 # Send ABORT on ACK and I/O errors.
1114 if rc is None:
1115 rc = self.sendMsg('ABORT');
1116 if rc is True:
1117 self.recvAckLogged('ABORT');
1118 rc = False;
1119 return rc;
1120
1121 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1122 try:
1123 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1124 except:
1125 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1126 return False;
1127
1128 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1129
1130 oLocalFile.close();
1131 if rc is False:
1132 try:
1133 os.remove(sLocalFile);
1134 except:
1135 reporter.errorXcpt();
1136 return rc;
1137
1138 def taskDownloadString(self, sRemoteFile):
1139 # Wrap sContent in a file like class.
1140 class OutStringFile(object): # pylint: disable=R0903
1141 def __init__(self):
1142 self.asContent = [];
1143
1144 def write(self, sBuf):
1145 self.asContent.append(sBuf);
1146 return None;
1147
1148 oLocalString = OutStringFile();
1149 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1150 if rc is True:
1151 if not oLocalString.asContent:
1152 rc = '';
1153 else:
1154 rc = ''.join(oLocalString.asContent);
1155 return rc;
1156
1157 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1158 """Common worker for taskDownloadFile and taskDownloadString."""
1159 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1160 if rc is True:
1161 #
1162 # Process data packets until eof.
1163 #
1164 uMyCrc32 = zlib.crc32("");
1165 while rc is True:
1166 cbMsg, sOpcode, abPayload = self.recvReply();
1167 if cbMsg is None:
1168 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1169 rc = None;
1170 break;
1171
1172 # Validate.
1173 sOpcode = sOpcode.rstrip();
1174 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1175 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1176 % (sOpcode, getSZ(abPayload, 0, "None")));
1177 rc = False;
1178 break;
1179 if sOpcode == 'DATA' and len(abPayload) < 4:
1180 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1181 rc = None;
1182 break;
1183 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1184 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1185 rc = None;
1186 break;
1187
1188 # Check the CRC (common for both packets).
1189 uCrc32 = getU32(abPayload, 0);
1190 if sOpcode == 'DATA':
1191 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1192 if uCrc32 != (uMyCrc32 & 0xffffffff):
1193 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1194 % (hex(uMyCrc32), hex(uCrc32)));
1195 rc = None;
1196 break;
1197 if sOpcode == 'DATA EOF':
1198 rc = self.sendMsg('ACK');
1199 break;
1200
1201 # Finally, push the data to the file.
1202 try:
1203 oLocalFile.write(abPayload[4:].tostring());
1204 except:
1205 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1206 rc = None;
1207 break;
1208 rc = self.sendMsg('ACK');
1209
1210 # Send NACK on validation and I/O errors.
1211 if rc is None:
1212 rc = self.sendMsg('NACK');
1213 rc = False;
1214 return rc;
1215
1216 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1217 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1218 if rc is True:
1219 rc = self.recvAckLogged('UNPKFILE');
1220 return rc;
1221
1222 # pylint: enable=C0111
1223
1224
1225 #
1226 # Public methods - generic task queries
1227 #
1228
1229 def isSuccess(self):
1230 """Returns True if the task completed successfully, otherwise False."""
1231 self.lockTask();
1232 sStatus = self.sStatus;
1233 oTaskRc = self.oTaskRc;
1234 self.unlockTask();
1235 if sStatus != "":
1236 return False;
1237 if oTaskRc is False or oTaskRc is None:
1238 return False;
1239 return True;
1240
1241 def getResult(self):
1242 """
1243 Returns the result of a completed task.
1244 Returns None if not completed yet or no previous task.
1245 """
1246 self.lockTask();
1247 sStatus = self.sStatus;
1248 oTaskRc = self.oTaskRc;
1249 self.unlockTask();
1250 if sStatus != "":
1251 return None;
1252 return oTaskRc;
1253
1254 def getLastReply(self):
1255 """
1256 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1257 Returns a None, None, None three-tuple if there was no last reply.
1258 """
1259 self.lockTask();
1260 t3oReply = self.t3oReply;
1261 self.unlockTask();
1262 return t3oReply;
1263
1264 #
1265 # Public methods - connection.
1266 #
1267
1268 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1269 """
1270 Initiates a disconnect task.
1271
1272 Returns True on success, False on failure (logged).
1273
1274 The task returns True on success and False on failure.
1275 """
1276 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1277
1278 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1279 """Synchronous version."""
1280 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1281
1282 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1283 """
1284 Initiates a task for getting the TXS UUID.
1285
1286 Returns True on success, False on failure (logged).
1287
1288 The task returns UUID string (in {}) on success and False on failure.
1289 """
1290 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1291
1292 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1293 """Synchronous version."""
1294 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1295
1296 #
1297 # Public methods - execution.
1298 #
1299
1300 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1301 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1302 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1303 """
1304 Initiates a exec process task.
1305
1306 Returns True on success, False on failure (logged).
1307
1308 The task returns True if the process exited normally with status code 0.
1309 The task returns None if on failure prior to executing the process, and
1310 False if the process exited with a different status or in an abnormal
1311 manner. Both None and False are logged of course and further info can
1312 also be obtained by getLastReply().
1313
1314 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1315 these streams. If None, no special action is taken and the output goes
1316 to where ever the TXS sends its output, and ditto for input.
1317 - To send to / read from the bitbucket, pass '/dev/null'.
1318 - To redirect to/from a file, just specify the remote filename.
1319 - To append to a file use '>>' followed by the remote filename.
1320 - To pipe the stream to/from the TXS, specify a file like
1321 object. For StdIn a non-blocking read() method is required. For
1322 the other a write() method is required. Watch out for deadlock
1323 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1324 """
1325 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1326 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1327 oStdOut, oStdErr, oTestPipe, sAsUser));
1328
1329 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1330 oStdIn = '/dev/null', oStdOut = '/dev/null',
1331 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1332 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1333 """Synchronous version."""
1334 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1335 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1336
1337 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1338 cMsTimeout = 3600000, fIgnoreErrors = False):
1339 """
1340 Initiates a exec process test task.
1341
1342 Returns True on success, False on failure (logged).
1343
1344 The task returns True if the process exited normally with status code 0.
1345 The task returns None if on failure prior to executing the process, and
1346 False if the process exited with a different status or in an abnormal
1347 manner. Both None and False are logged of course and further info can
1348 also be obtained by getLastReply().
1349
1350 Standard in is taken from /dev/null. While both standard output and
1351 standard error goes directly to reporter.log(). The testpipe is piped
1352 to reporter.xxxx.
1353 """
1354
1355 sStdIn = '/dev/null';
1356 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1357 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1358 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1359 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1360
1361 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1362 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1363
1364 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1365 cMsTimeout = 3600000, fIgnoreErrors = False):
1366 """Synchronous version."""
1367 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1368 cMsTimeout, fIgnoreErrors);
1369
1370 #
1371 # Public methods - file system
1372 #
1373
1374 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1375 """
1376 Initiates a reboot task.
1377
1378 Returns True on success, False on failure (logged).
1379
1380 The task returns True on success, False on failure (logged). The
1381 session will be disconnected on successful task completion.
1382 """
1383 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1384
1385 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1386 """Synchronous version."""
1387 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1388
1389 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1390 """
1391 Initiates a shutdown task.
1392
1393 Returns True on success, False on failure (logged).
1394
1395 The task returns True on success, False on failure (logged).
1396 """
1397 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1398
1399 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1400 """Synchronous version."""
1401 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1402
1403
1404 #
1405 # Public methods - file system
1406 #
1407
1408 def asyncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1409 """
1410 Initiates a mkdir task.
1411
1412 Returns True on success, False on failure (logged).
1413
1414 The task returns True on success, False on failure (logged).
1415 """
1416 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1417
1418 def syncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1419 """Synchronous version."""
1420 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1421
1422 def asyncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1423 """
1424 Initiates a mkdir -p task.
1425
1426 Returns True on success, False on failure (logged).
1427
1428 The task returns True on success, False on failure (logged).
1429 """
1430 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1431
1432 def syncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1433 """Synchronous version."""
1434 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1435
1436 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1437 """
1438 Initiates a symlink task.
1439
1440 Returns True on success, False on failure (logged).
1441
1442 The task returns True on success, False on failure (logged).
1443 """
1444 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1445
1446 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1447 """Synchronous version."""
1448 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1449
1450 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1451 """
1452 Initiates a rmdir task.
1453
1454 Returns True on success, False on failure (logged).
1455
1456 The task returns True on success, False on failure (logged).
1457 """
1458 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1459
1460 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1461 """Synchronous version."""
1462 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1463
1464 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1465 """
1466 Initiates a rmfile task.
1467
1468 Returns True on success, False on failure (logged).
1469
1470 The task returns True on success, False on failure (logged).
1471 """
1472 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1473
1474 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1475 """Synchronous version."""
1476 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1477
1478 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1479 """
1480 Initiates a rmsymlink task.
1481
1482 Returns True on success, False on failure (logged).
1483
1484 The task returns True on success, False on failure (logged).
1485 """
1486 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1487
1488 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1489 """Synchronous version."""
1490 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1491
1492 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1493 """
1494 Initiates a rmtree task.
1495
1496 Returns True on success, False on failure (logged).
1497
1498 The task returns True on success, False on failure (logged).
1499 """
1500 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1501
1502 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1503 """Synchronous version."""
1504 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1505
1506 #def "CHMOD "
1507 #def "CHOWN "
1508 #def "CHGRP "
1509
1510 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1511 """
1512 Initiates a is-dir query task.
1513
1514 Returns True on success, False on failure (logged).
1515
1516 The task returns True if it's a directory, False if it isn't, and
1517 None on error (logged).
1518 """
1519 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1520
1521 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """Synchronous version."""
1523 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1524
1525 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1526 """
1527 Initiates a is-file query task.
1528
1529 Returns True on success, False on failure (logged).
1530
1531 The task returns True if it's a file, False if it isn't, and None on
1532 error (logged).
1533 """
1534 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1535
1536 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1537 """Synchronous version."""
1538 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1539
1540 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1541 """
1542 Initiates a is-symbolic-link query task.
1543
1544 Returns True on success, False on failure (logged).
1545
1546 The task returns True if it's a symbolic linke, False if it isn't, and
1547 None on error (logged).
1548 """
1549 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1550
1551 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1552 """Synchronous version."""
1553 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1554
1555 #def "STAT "
1556 #def "LSTAT "
1557 #def "LIST "
1558
1559 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1560 """
1561 Initiates a download query task.
1562
1563 Returns True on success, False on failure (logged).
1564
1565 The task returns True on success, False on failure (logged).
1566 """
1567 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1568
1569 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1570 """Synchronous version."""
1571 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1572
1573 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1574 """
1575 Initiates a upload string task.
1576
1577 Returns True on success, False on failure (logged).
1578
1579 The task returns True on success, False on failure (logged).
1580 """
1581 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1582
1583 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1584 """Synchronous version."""
1585 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1586
1587 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1588 """
1589 Initiates a download file task.
1590
1591 Returns True on success, False on failure (logged).
1592
1593 The task returns True on success, False on failure (logged).
1594 """
1595 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1596
1597 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1598 """Synchronous version."""
1599 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1600
1601 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1602 """
1603 Initiates a download string task.
1604
1605 Returns True on success, False on failure (logged).
1606
1607 The task returns a byte string on success, False on failure (logged).
1608 """
1609 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1610
1611 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1612 """Synchronous version."""
1613 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1614
1615 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1616 """
1617 Initiates a unpack file task.
1618
1619 Returns True on success, False on failure (logged).
1620
1621 The task returns True on success, False on failure (logged).
1622 """
1623 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1624 (sRemoteFile, sRemoteDir));
1625
1626 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1627 """Synchronous version."""
1628 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1629
1630
1631class TransportTcp(TransportBase):
1632 """
1633 TCP transport layer for the TXS client session class.
1634 """
1635
1636 def __init__(self, sHostname, uPort, fReversedSetup):
1637 """
1638 Save the parameters. The session will call us back to make the
1639 connection later on its worker thread.
1640 """
1641 TransportBase.__init__(self, utils.getCallerName());
1642 self.sHostname = sHostname;
1643 self.fReversedSetup = fReversedSetup;
1644 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1645 self.oSocket = None;
1646 self.oWakeupW = None;
1647 self.oWakeupR = None;
1648 self.fConnectCanceled = False;
1649 self.fIsConnecting = False;
1650 self.oCv = threading.Condition();
1651 self.abReadAhead = array.array('B');
1652
1653 def toString(self):
1654 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1655 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1656 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1657 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1658
1659 def __isInProgressXcpt(self, oXcpt):
1660 """ In progress exception? """
1661 try:
1662 if isinstance(oXcpt, socket.error):
1663 try:
1664 if oXcpt[0] == errno.EINPROGRESS:
1665 return True;
1666 except: pass;
1667 # Windows?
1668 try:
1669 if oXcpt[0] == errno.EWOULDBLOCK:
1670 return True;
1671 except: pass;
1672 except:
1673 pass;
1674 return False;
1675
1676 def __isWouldBlockXcpt(self, oXcpt):
1677 """ Would block exception? """
1678 try:
1679 if isinstance(oXcpt, socket.error):
1680 try:
1681 if oXcpt[0] == errno.EWOULDBLOCK:
1682 return True;
1683 except: pass;
1684 try:
1685 if oXcpt[0] == errno.EAGAIN:
1686 return True;
1687 except: pass;
1688 except:
1689 pass;
1690 return False;
1691
1692 def __isConnectionReset(self, oXcpt):
1693 """ Connection reset by Peer or others. """
1694 try:
1695 if isinstance(oXcpt, socket.error):
1696 try:
1697 if oXcpt[0] == errno.ECONNRESET:
1698 return True;
1699 except: pass;
1700 try:
1701 if oXcpt[0] == errno.ENETRESET:
1702 return True;
1703 except: pass;
1704 except:
1705 pass;
1706 return False;
1707
1708 def _closeWakeupSockets(self):
1709 """ Closes the wakup sockets. Caller should own the CV. """
1710 oWakeupR = self.oWakeupR;
1711 self.oWakeupR = None;
1712 if oWakeupR is not None:
1713 oWakeupR.close();
1714
1715 oWakeupW = self.oWakeupW;
1716 self.oWakeupW = None;
1717 if oWakeupW is not None:
1718 oWakeupW.close();
1719
1720 return None;
1721
1722 def cancelConnect(self):
1723 # This is bad stuff.
1724 self.oCv.acquire();
1725 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1726 self.fConnectCanceled = True;
1727 if self.fIsConnecting:
1728 oSocket = self.oSocket;
1729 self.oSocket = None;
1730 if oSocket is not None:
1731 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1732 oSocket.close();
1733
1734 oWakeupW = self.oWakeupW;
1735 self.oWakeupW = None;
1736 if oWakeupW is not None:
1737 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1738 try: oWakeupW.send('cancelled!\n');
1739 except: reporter.logXcpt();
1740 try: oWakeupW.shutdown(socket.SHUT_WR);
1741 except: reporter.logXcpt();
1742 oWakeupW.close();
1743 self.oCv.release();
1744
1745 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1746 """ Connects to the TXS server as server, i.e. the reversed setup. """
1747 assert(self.fReversedSetup);
1748
1749 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1750
1751 # Workaround for bind() failure...
1752 try:
1753 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1754 except:
1755 reporter.errorXcpt('socket.listen(1) failed');
1756 return None;
1757
1758 # Bind the socket and make it listen.
1759 try:
1760 oSocket.bind((self.sHostname, self.uPort));
1761 except:
1762 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1763 return None;
1764 try:
1765 oSocket.listen(1);
1766 except:
1767 reporter.errorXcpt('socket.listen(1) failed');
1768 return None;
1769
1770 # Accept connections.
1771 oClientSocket = None;
1772 tClientAddr = None;
1773 try:
1774 (oClientSocket, tClientAddr) = oSocket.accept();
1775 except socket.error, e:
1776 if not self.__isInProgressXcpt(e):
1777 raise;
1778
1779 # Do the actual waiting.
1780 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1781 try:
1782 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1783 except socket.error, e:
1784 if e[0] != errno.EBADF or not self.fConnectCanceled:
1785 raise;
1786 reporter.log('socket.select() on accept was canceled');
1787 return None;
1788 except:
1789 reporter.logXcpt('socket.select() on accept');
1790
1791 # Try accept again.
1792 try:
1793 (oClientSocket, tClientAddr) = oSocket.accept();
1794 except socket.error, e:
1795 if not self.__isInProgressXcpt(e):
1796 if e[0] != errno.EBADF or not self.fConnectCanceled:
1797 raise;
1798 reporter.log('socket.accept() was canceled');
1799 return None;
1800 reporter.log('socket.accept() timed out');
1801 return False;
1802 except:
1803 reporter.errorXcpt('socket.accept() failed');
1804 return None;
1805 except:
1806 reporter.errorXcpt('socket.accept() failed');
1807 return None;
1808
1809 # Store the connected socket and throw away the server socket.
1810 self.oCv.acquire();
1811 if not self.fConnectCanceled:
1812 self.oSocket.close();
1813 self.oSocket = oClientSocket;
1814 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1815 self.oCv.release();
1816 return True;
1817
1818 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1819 """ Connects to the TXS server as client. """
1820 assert(not self.fReversedSetup);
1821
1822 # Connect w/ timeouts.
1823 rc = None;
1824 try:
1825 oSocket.connect((self.sHostname, self.uPort));
1826 rc = True;
1827 except socket.error, e:
1828 iRc = e[0];
1829 if self.__isInProgressXcpt(e):
1830 # Do the actual waiting.
1831 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1832 try:
1833 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1834 if len(ttRc[1]) + len(ttRc[2]) == 0:
1835 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1836 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1837 rc = iRc == 0;
1838 except socket.error, e:
1839 iRc = e[0];
1840 except:
1841 iRc = -42;
1842 reporter.fatalXcpt('socket.select() on connect failed');
1843
1844 if rc is True:
1845 pass;
1846 elif iRc == errno.ECONNREFUSED \
1847 or iRc == errno.EHOSTUNREACH \
1848 or iRc == errno.EINTR \
1849 or iRc == errno.ENETDOWN \
1850 or iRc == errno.ENETUNREACH \
1851 or iRc == errno.ETIMEDOUT:
1852 rc = False; # try again.
1853 else:
1854 if iRc != errno.EBADF or not self.fConnectCanceled:
1855 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1856 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1857 except:
1858 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1859 return rc;
1860
1861
1862 def connect(self, cMsTimeout):
1863 # Create a non-blocking socket.
1864 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1865 try:
1866 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1867 except:
1868 reporter.fatalXcpt('socket.socket() failed');
1869 return None;
1870 try:
1871 oSocket.setblocking(0);
1872 except:
1873 oSocket.close();
1874 reporter.fatalXcpt('socket.socket() failed');
1875 return None;
1876
1877 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1878 oWakeupR = None;
1879 oWakeupW = None;
1880 if hasattr(socket, 'socketpair'):
1881 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1882 except: reporter.logXcpt('socket.socketpair() failed');
1883
1884 # Update the state.
1885 self.oCv.acquire();
1886 rc = None;
1887 if not self.fConnectCanceled:
1888 self.oSocket = oSocket;
1889 self.oWakeupW = oWakeupW;
1890 self.oWakeupR = oWakeupR;
1891 self.fIsConnecting = True;
1892 self.oCv.release();
1893
1894 # Try connect.
1895 if oWakeupR is None:
1896 oWakeupR = oSocket; # Avoid select failure.
1897 if self.fReversedSetup:
1898 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1899 else:
1900 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1901 oSocket = None;
1902
1903 # Update the state and cleanup on failure/cancel.
1904 self.oCv.acquire();
1905 if rc is True and self.fConnectCanceled:
1906 rc = False;
1907 self.fIsConnecting = False;
1908
1909 if rc is not True:
1910 if self.oSocket is not None:
1911 self.oSocket.close();
1912 self.oSocket = None;
1913 self._closeWakeupSockets();
1914 self.oCv.release();
1915
1916 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1917 return rc;
1918
1919 def disconnect(self, fQuiet = False):
1920 if self.oSocket is not None:
1921 self.abReadAhead = array.array('B');
1922
1923 # Try a shutting down the socket gracefully (draining it).
1924 try:
1925 self.oSocket.shutdown(socket.SHUT_WR);
1926 except:
1927 if not fQuiet:
1928 reporter.error('shutdown(SHUT_WR)');
1929 try:
1930 self.oSocket.setblocking(0); # just in case it's not set.
1931 sData = "1";
1932 while sData:
1933 sData = self.oSocket.recv(16384);
1934 except:
1935 pass;
1936
1937 # Close it.
1938 self.oCv.acquire();
1939 try: self.oSocket.setblocking(1);
1940 except: pass;
1941 self.oSocket.close();
1942 self.oSocket = None;
1943 else:
1944 self.oCv.acquire();
1945 self._closeWakeupSockets();
1946 self.oCv.release();
1947
1948 def sendBytes(self, abBuf, cMsTimeout):
1949 if self.oSocket is None:
1950 reporter.error('TransportTcp.sendBytes: No connection.');
1951 return False;
1952
1953 # Try send it all.
1954 try:
1955 cbSent = self.oSocket.send(abBuf);
1956 if cbSent == len(abBuf):
1957 return True;
1958 except Exception, oXcpt:
1959 if not self.__isWouldBlockXcpt(oXcpt):
1960 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
1961 return False;
1962 cbSent = 0;
1963
1964 # Do a timed send.
1965 msStart = base.timestampMilli();
1966 while True:
1967 cMsElapsed = base.timestampMilli() - msStart;
1968 if cMsElapsed > cMsTimeout:
1969 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
1970 break;
1971
1972 # wait.
1973 try:
1974 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1975 if ttRc[2] and not ttRc[1]:
1976 reporter.error('TranportTcp.sendBytes: select returned with exception');
1977 break;
1978 if not ttRc[1]:
1979 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
1980 break;
1981 except:
1982 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1983 break;
1984
1985 # Try send more.
1986 try:
1987 cbSent += self.oSocket.send(abBuf[cbSent:]);
1988 if cbSent == len(abBuf):
1989 return True;
1990 except Exception, oXcpt:
1991 if not self.__isWouldBlockXcpt(oXcpt):
1992 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
1993 break;
1994
1995 return False;
1996
1997 def __returnReadAheadBytes(self, cb):
1998 """ Internal worker for recvBytes. """
1999 assert(len(self.abReadAhead) >= cb);
2000 abRet = self.abReadAhead[:cb];
2001 self.abReadAhead = self.abReadAhead[cb:];
2002 return abRet;
2003
2004 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2005 if self.oSocket is None:
2006 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2007 return None;
2008
2009 # Try read in some more data without bothering with timeout handling first.
2010 if len(self.abReadAhead) < cb:
2011 try:
2012 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2013 if abBuf:
2014 self.abReadAhead.extend(array.array('B', abBuf));
2015 except Exception, oXcpt:
2016 if not self.__isWouldBlockXcpt(oXcpt):
2017 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2018 return None;
2019
2020 if len(self.abReadAhead) >= cb:
2021 return self.__returnReadAheadBytes(cb);
2022
2023 # Timeout loop.
2024 msStart = base.timestampMilli();
2025 while True:
2026 cMsElapsed = base.timestampMilli() - msStart;
2027 if cMsElapsed > cMsTimeout:
2028 if not fNoDataOk or self.abReadAhead:
2029 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2030 break;
2031
2032 # Wait.
2033 try:
2034 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2035 if ttRc[2] and not ttRc[0]:
2036 reporter.error('TranportTcp.recvBytes: select returned with exception');
2037 break;
2038 if not ttRc[0]:
2039 if not fNoDataOk or self.abReadAhead:
2040 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2041 % (len(self.abReadAhead), cb, fNoDataOk));
2042 break;
2043 except:
2044 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2045 break;
2046
2047 # Try read more.
2048 try:
2049 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2050 if not abBuf:
2051 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2052 % (len(self.abReadAhead), cb, fNoDataOk));
2053 self.disconnect();
2054 return None;
2055
2056 self.abReadAhead.extend(array.array('B', abBuf));
2057
2058 except Exception, oXcpt:
2059 reporter.log('recv => exception %s' % (oXcpt,));
2060 if not self.__isWouldBlockXcpt(oXcpt):
2061 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2062 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2063 break;
2064
2065 # Done?
2066 if len(self.abReadAhead) >= cb:
2067 return self.__returnReadAheadBytes(cb);
2068
2069 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2070 return None;
2071
2072 def isConnectionOk(self):
2073 if self.oSocket is None:
2074 return False;
2075 try:
2076 ttRc = select.select([], [], [self.oSocket], 0.0);
2077 if ttRc[2]:
2078 return False;
2079
2080 self.oSocket.send(array.array('B')); # send zero bytes.
2081 except:
2082 return False;
2083 return True;
2084
2085 def isRecvPending(self, cMsTimeout = 0):
2086 try:
2087 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2088 if not ttRc[0]:
2089 return False;
2090 except:
2091 pass;
2092 return True;
2093
2094
2095def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2096 """
2097 Opens a connection to a Test Execution Service via TCP, given its name.
2098 """
2099 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2100 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2101 try:
2102 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2103 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2104 except:
2105 reporter.errorXcpt(None, 15);
2106 return None;
2107 return oSession;
2108
2109
2110def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2111 """
2112 Tries to open a connection to a Test Execution Service via TCP, given its name.
2113
2114 This differs from openTcpSession in that it won't log a connection failure
2115 as an error.
2116 """
2117 try:
2118 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2119 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2120 except:
2121 reporter.errorXcpt(None, 15);
2122 return None;
2123 return oSession;
2124
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette