VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 57952

Last change on this file since 57952 was 56295, checked in by vboxsync, 10 years ago

ValidationKit: Updated (C) year.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 78.7 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 56295 2015-06-09 14:29:55Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2015 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 56295 $"
30
31# Standard Python imports.
32import array
33import errno
34import os
35import select
36import socket
37import threading
38import time
39import types
40import zlib
41import uuid
42
43# Validation Kit imports.
44from common import utils;
45from testdriver import base;
46from testdriver import reporter;
47from testdriver.base import TdTaskBase;
48
49#
50# Helpers for decoding data received from the TXS.
51# These are used both the Session and Transport classes.
52#
53
54def getU32(abData, off):
55 """Get a U32 field."""
56 return abData[off] \
57 + abData[off + 1] * 256 \
58 + abData[off + 2] * 65536 \
59 + abData[off + 3] * 16777216;
60
61def getSZ(abData, off, sDefault = None):
62 """
63 Get a zero-terminated string field.
64 Returns sDefault if the string is invalid.
65 """
66 cchStr = getSZLen(abData, off);
67 if cchStr >= 0:
68 abStr = abData[off:(off + cchStr)];
69 try:
70 return abStr.tostring().decode('utf_8');
71 except:
72 reporter.errorXcpt('getSZ(,%u)' % (off));
73 return sDefault;
74
75def getSZLen(abData, off):
76 """
77 Get the length of a zero-terminated string field, in bytes.
78 Returns -1 if off is beyond the data packet or not properly terminated.
79 """
80 cbData = len(abData);
81 if off >= cbData:
82 return -1;
83
84 offCur = off;
85 while abData[offCur] != 0:
86 offCur = offCur + 1;
87 if offCur >= cbData:
88 return -1;
89
90 return offCur - off;
91
92def isValidOpcodeEncoding(sOpcode):
93 """
94 Checks if the specified opcode is valid or not.
95 Returns True on success.
96 Returns False if it is invalid, details in the log.
97 """
98 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
100 if len(sOpcode) != 8:
101 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
102 return False;
103 for i in range(0, 1):
104 if sSet1.find(sOpcode[i]) < 0:
105 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
106 return False;
107 for i in range(2, 7):
108 if sSet2.find(sOpcode[i]) < 0:
109 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
110 return False;
111 return True;
112
113#
114# Helper for encoding data sent to the TXS.
115#
116
117def u32ToByteArray(u32):
118 """Encodes the u32 value as a little endian byte (B) array."""
119 return array.array('B', \
120 ( u32 % 256, \
121 (u32 / 256) % 256, \
122 (u32 / 65536) % 256, \
123 (u32 / 16777216) % 256) );
124
125
126
127class TransportBase(object):
128 """
129 Base class for the transport layer.
130 """
131
132 def __init__(self, sCaller):
133 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
134 self.fDummy = 0;
135 self.abReadAheadHdr = array.array('B');
136
137 def toString(self):
138 """
139 Stringify the instance for logging and debugging.
140 """
141 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
142
143 def __str__(self):
144 return self.toString();
145
146 def cancelConnect(self):
147 """
148 Cancels any pending connect() call.
149 Returns None;
150 """
151 return None;
152
153 def connect(self, cMsTimeout):
154 """
155 Quietly attempts to connect to the TXS.
156
157 Returns True on success.
158 Returns False on retryable errors (no logging).
159 Returns None on fatal errors with details in the log.
160
161 Override this method, don't call super.
162 """
163 _ = cMsTimeout;
164 return False;
165
166 def disconnect(self, fQuiet = False):
167 """
168 Disconnect from the TXS.
169
170 Returns True.
171
172 Override this method, don't call super.
173 """
174 _ = fQuiet;
175 return True;
176
177 def sendBytes(self, abBuf, cMsTimeout):
178 """
179 Sends the bytes in the buffer abBuf to the TXS.
180
181 Returns True on success.
182 Returns False on failure and error details in the log.
183
184 Override this method, don't call super.
185
186 Remarks: len(abBuf) is always a multiple of 16.
187 """
188 _ = abBuf; _ = cMsTimeout;
189 return False;
190
191 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
192 """
193 Receive cb number of bytes from the TXS.
194
195 Returns the bytes (array('B')) on success.
196 Returns None on failure and error details in the log.
197
198 Override this method, don't call super.
199
200 Remarks: cb is always a multiple of 16.
201 """
202 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
203 return False;
204
205 def isConnectionOk(self):
206 """
207 Checks if the connection is OK.
208
209 Returns True if it is.
210 Returns False if it isn't (caller should call diconnect).
211
212 Override this method, don't call super.
213 """
214 return True;
215
216 def isRecvPending(self, cMsTimeout = 0):
217 """
218 Checks if there is incoming bytes, optionally waiting cMsTimeout
219 milliseconds for something to arrive.
220
221 Returns True if there is, False if there isn't.
222
223 Override this method, don't call super.
224 """
225 _ = cMsTimeout;
226 return False;
227
228 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
229 """
230 Sends a message (opcode + encoded payload).
231
232 Returns True on success.
233 Returns False on failure and error details in the log.
234 """
235 # Fix + check the opcode.
236 if len(sOpcode) < 2:
237 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
238 return False;
239 sOpcode = sOpcode.ljust(8);
240 if not isValidOpcodeEncoding(sOpcode):
241 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
242 return False;
243
244 # Start construct the message.
245 cbMsg = 16 + len(abPayload);
246 abMsg = array.array('B');
247 abMsg.extend(u32ToByteArray(cbMsg));
248 abMsg.extend((0, 0, 0, 0)); # uCrc32
249 try:
250 abMsg.extend(array.array('B', \
251 ( ord(sOpcode[0]), \
252 ord(sOpcode[1]), \
253 ord(sOpcode[2]), \
254 ord(sOpcode[3]), \
255 ord(sOpcode[4]), \
256 ord(sOpcode[5]), \
257 ord(sOpcode[6]), \
258 ord(sOpcode[7]) ) ) );
259 if len(abPayload) > 0:
260 abMsg.extend(abPayload);
261 except:
262 reporter.fatalXcpt('sendMsgInt: packing problem...');
263 return False;
264
265 # checksum it, padd it and send it off.
266 uCrc32 = zlib.crc32(abMsg[8:]);
267 abMsg[4:8] = u32ToByteArray(uCrc32);
268
269 while len(abMsg) % 16:
270 abMsg.append(0);
271
272 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
273 return self.sendBytes(abMsg, cMsTimeout);
274
275 def recvMsg(self, cMsTimeout, fNoDataOk = False):
276 """
277 Receives a message from the TXS.
278
279 Returns the message three-tuple: length, opcode, payload.
280 Returns (None, None, None) on failure and error details in the log.
281 """
282
283 # Read the header.
284 if len(self.abReadAheadHdr) > 0:
285 assert(len(self.abReadAheadHdr) == 16);
286 abHdr = self.abReadAheadHdr;
287 self.abReadAheadHdr = array.array('B');
288 else:
289 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
290 if abHdr is None:
291 return (None, None, None);
292 if len(abHdr) != 16:
293 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
294 return (None, None, None);
295
296 # Unpack and validate the header.
297 cbMsg = getU32(abHdr, 0);
298 uCrc32 = getU32(abHdr, 4);
299 sOpcode = abHdr[8:16].tostring().decode('ascii');
300
301 if cbMsg < 16:
302 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
303 return (None, None, None);
304 if cbMsg > 1024*1024:
305 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
306 return (None, None, None);
307 if not isValidOpcodeEncoding(sOpcode):
308 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
309 return (None, None, None);
310
311 # Get the payload (if any), dropping the padding.
312 abPayload = array.array('B');
313 if cbMsg > 16:
314 if cbMsg % 16:
315 cbPadding = 16 - (cbMsg % 16);
316 else:
317 cbPadding = 0;
318 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
319 if abPayload is None:
320 self.abReadAheadHdr = abHdr;
321 if not fNoDataOk :
322 reporter.log('recvMsg: failed to recv payload bytes!');
323 return (None, None, None);
324
325 while cbPadding > 0:
326 abPayload.pop();
327 cbPadding = cbPadding - 1;
328
329 # Check the CRC-32.
330 if uCrc32 != 0:
331 uActualCrc32 = zlib.crc32(abHdr[8:]);
332 if cbMsg > 16:
333 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
334 uActualCrc32 = uActualCrc32 & 0xffffffff;
335 if uCrc32 != uActualCrc32:
336 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
337 return (None, None, None);
338
339 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
340 return (cbMsg, sOpcode, abPayload);
341
342 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
343 """
344 Sends a message (opcode + payload tuple).
345
346 Returns True on success.
347 Returns False on failure and error details in the log.
348 Returns None if you pass the incorrectly typed parameters.
349 """
350 # Encode the payload.
351 abPayload = array.array('B');
352 for o in aoPayload:
353 try:
354 if isinstance(o, basestring):
355 # the primitive approach...
356 sUtf8 = o.encode('utf_8');
357 for i in range(0, len(sUtf8)):
358 abPayload.append(ord(sUtf8[i]))
359 abPayload.append(0);
360 elif isinstance(o, types.LongType):
361 if o < 0 or o > 0xffffffff:
362 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
363 return None;
364 abPayload.extend(u32ToByteArray(o));
365 elif isinstance(o, array.array):
366 abPayload.extend(o);
367 else:
368 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
369 return None;
370 except:
371 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
372 return None;
373 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
374
375
376class Session(TdTaskBase):
377 """
378 A Test eXecution Service (TXS) client session.
379 """
380
381 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
382 """
383 Construct a TXS session.
384
385 This starts by connecting to the TXS and will enter the signalled state
386 when connected or the timeout has been reached.
387 """
388 TdTaskBase.__init__(self, utils.getCallerName());
389 self.oTransport = oTransport;
390 self.sStatus = "";
391 self.cMsTimeout = 0;
392 self.fErr = True; # Whether to report errors as error.
393 self.msStart = 0;
394 self.oThread = None;
395 self.fnTask = self.taskDummy;
396 self.aTaskArgs = None;
397 self.oTaskRc = None;
398 self.t3oReply = (None, None, None);
399 self.fScrewedUpMsgState = False;
400 self.fTryConnect = fTryConnect;
401
402 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
403 raise base.GenError("startTask failed");
404
405 def __del__(self):
406 """Make sure to cancel the task when deleted."""
407 self.cancelTask();
408
409 def toString(self):
410 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
411 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
412 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
413 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
414
415 def taskDummy(self):
416 """Place holder to catch broken state handling."""
417 raise Exception();
418
419 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
420 """
421 Kicks of a new task.
422
423 cMsTimeout: The task timeout in milliseconds. Values less than
424 500 ms will be adjusted to 500 ms. This means it is
425 OK to use negative value.
426 sStatus: The task status.
427 fnTask: The method that'll execute the task.
428 aArgs: Arguments to pass to fnTask.
429
430 Returns True on success, False + error in log on failure.
431 """
432 if not self.cancelTask():
433 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
434 return False;
435
436 # Change status and make sure we're the
437 self.lockTask();
438 if self.sStatus != "":
439 self.unlockTask();
440 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
441 return False;
442 self.sStatus = "setup";
443 self.oTaskRc = None;
444 self.t3oReply = (None, None, None);
445 self.resetTaskLocked();
446 self.unlockTask();
447
448 self.cMsTimeout = max(cMsTimeout, 500);
449 self.fErr = not fIgnoreErrors;
450 self.fnTask = fnTask;
451 self.aTaskArgs = aArgs;
452 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
453 self.oThread.setDaemon(True);
454 self.msStart = base.timestampMilli();
455
456 self.lockTask();
457 self.sStatus = sStatus;
458 self.unlockTask();
459 self.oThread.start();
460
461 return True;
462
463 def cancelTask(self, fSync = True):
464 """
465 Attempts to cancel any pending tasks.
466 Returns success indicator (True/False).
467 """
468 self.lockTask();
469
470 if self.sStatus == "":
471 self.unlockTask();
472 return True;
473 if self.sStatus == "setup":
474 self.unlockTask();
475 return False;
476 if self.sStatus == "cancelled":
477 self.unlockTask();
478 return False;
479
480 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
481 if self.sStatus == 'connecting':
482 self.oTransport.cancelConnect();
483
484 self.sStatus = "cancelled";
485 oThread = self.oThread;
486 self.unlockTask();
487
488 if not fSync:
489 return False;
490
491 oThread.join(61.0);
492 return oThread.isAlive();
493
494 def taskThread(self):
495 """
496 The task thread function.
497 This does some housekeeping activities around the real task method call.
498 """
499 if not self.isCancelled():
500 try:
501 fnTask = self.fnTask;
502 oTaskRc = fnTask(*self.aTaskArgs);
503 except:
504 reporter.fatalXcpt('taskThread', 15);
505 oTaskRc = None;
506 else:
507 reporter.log('taskThread: cancelled already');
508
509 self.lockTask();
510
511 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
512 self.oTaskRc = oTaskRc;
513 self.oThread = None;
514 self.sStatus = '';
515 self.signalTaskLocked();
516
517 self.unlockTask();
518 return None;
519
520 def isCancelled(self):
521 """Internal method for checking if the task has been cancelled."""
522 self.lockTask();
523 sStatus = self.sStatus;
524 self.unlockTask();
525 if sStatus == "cancelled":
526 return True;
527 return False;
528
529 def hasTimedOut(self):
530 """Internal method for checking if the task has timed out or not."""
531 cMsLeft = self.getMsLeft();
532 if cMsLeft <= 0:
533 return True;
534 return False;
535
536 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
537 """Gets the time left until the timeout."""
538 cMsElapsed = base.timestampMilli() - self.msStart;
539 if cMsElapsed < 0:
540 return cMsMin;
541 cMsLeft = self.cMsTimeout - cMsElapsed;
542 if cMsLeft <= cMsMin:
543 return cMsMin;
544 if cMsLeft > cMsMax and cMsMax > 0:
545 return cMsMax
546 return cMsLeft;
547
548 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
549 """
550 Wrapper for TransportBase.recvMsg that stashes the response away
551 so the client can inspect it later on.
552 """
553 if cMsTimeout == None:
554 cMsTimeout = self.getMsLeft(500);
555 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
556 self.lockTask();
557 self.t3oReply = (cbMsg, sOpcode, abPayload);
558 self.unlockTask();
559 return (cbMsg, sOpcode, abPayload);
560
561 def recvAck(self, fNoDataOk = False):
562 """
563 Receives an ACK or error response from the TXS.
564
565 Returns True on success.
566 Returns False on timeout or transport error.
567 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
568 and there are always details of some sort or another.
569 """
570 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
571 if cbMsg is None:
572 return False;
573 sOpcode = sOpcode.strip()
574 if sOpcode == "ACK":
575 return True;
576 return (sOpcode, getSZ(abPayload, 0, sOpcode));
577
578 def recvAckLogged(self, sCommand, fNoDataOk = False):
579 """
580 Wrapper for recvAck and logging.
581 Returns True on success (ACK).
582 Returns False on time, transport error and errors signalled by TXS.
583 """
584 rc = self.recvAck(fNoDataOk);
585 if rc is not True and not fNoDataOk:
586 if rc is False:
587 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
588 else:
589 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
590 rc = False;
591 return rc;
592
593 def recvTrueFalse(self, sCommand):
594 """
595 Receives a TRUE/FALSE response from the TXS.
596 Returns True on TRUE, False on FALSE and None on error/other (logged).
597 """
598 cbMsg, sOpcode, abPayload = self.recvReply();
599 if cbMsg is None:
600 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
601 return None;
602
603 sOpcode = sOpcode.strip()
604 if sOpcode == "TRUE":
605 return True;
606 if sOpcode == "FALSE":
607 return False;
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
609 return None;
610
611 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
612 """
613 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
614 """
615 if cMsTimeout == None:
616 cMsTimeout = self.getMsLeft(500);
617 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
618
619 def asyncToSync(self, fnAsync, *aArgs):
620 """
621 Wraps an asynchronous task into a synchronous operation.
622
623 Returns False on failure, task return status on success.
624 """
625 rc = fnAsync(*aArgs);
626 if rc is False:
627 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
628 return rc;
629
630 rc = self.waitForTask(self.cMsTimeout + 5000);
631 if rc is False:
632 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
633 self.cancelTask();
634 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
635 return False;
636
637 rc = self.getResult();
638 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
639 return rc;
640
641 #
642 # Connection tasks.
643 #
644
645 def taskConnect(self, cMsIdleFudge):
646 """Tries to connect to the TXS"""
647 while not self.isCancelled():
648 reporter.log2('taskConnect: connecting ...');
649 rc = self.oTransport.connect(self.getMsLeft(500));
650 if rc is True:
651 reporter.log('taskConnect: succeeded');
652 return self.taskGreet(cMsIdleFudge);
653 if rc is None:
654 reporter.log2('taskConnect: unable to connect');
655 return None;
656 if self.hasTimedOut():
657 reporter.log2('taskConnect: timed out');
658 if not self.fTryConnect:
659 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
660 return False;
661 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
662 if not self.fTryConnect:
663 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
664 return False;
665
666 def taskGreet(self, cMsIdleFudge):
667 """Greets the TXS"""
668 rc = self.sendMsg("HOWDY", ());
669 if rc is True:
670 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
671 if rc is True:
672 while cMsIdleFudge > 0:
673 cMsIdleFudge -= 1000;
674 time.sleep(1);
675 else:
676 self.oTransport.disconnect(self.fTryConnect);
677 return rc;
678
679 def taskBye(self):
680 """Says goodbye to the TXS"""
681 rc = self.sendMsg("BYE");
682 if rc is True:
683 rc = self.recvAckLogged("BYE");
684 self.oTransport.disconnect();
685 return rc;
686
687 def taskUuid(self):
688 """Gets the TXS UUID"""
689 rc = self.sendMsg("UUID");
690 if rc is True:
691 rc = False;
692 cbMsg, sOpcode, abPayload = self.recvReply();
693 if cbMsg is not None:
694 sOpcode = sOpcode.strip()
695 if sOpcode == "ACK UUID":
696 sUuid = getSZ(abPayload, 0);
697 if sUuid is not None:
698 sUuid = '{%s}' % (sUuid,)
699 try:
700 _ = uuid.UUID(sUuid);
701 rc = sUuid;
702 except:
703 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
704 else:
705 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
706 else:
707 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
708 else:
709 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
710 return rc;
711
712 #
713 # Process task
714 # pylint: disable=C0111
715 #
716
717 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
718 # Construct the payload.
719 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
720 for sArg in asArgs:
721 aoPayload.append('%s' % (sArg));
722 aoPayload.append(long(len(asAddEnv)));
723 for sPutEnv in asAddEnv:
724 aoPayload.append('%s' % (sPutEnv));
725 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
726 if isinstance(o, basestring):
727 aoPayload.append(o);
728 elif o is not None:
729 aoPayload.append('|');
730 o.uTxsClientCrc32 = zlib.crc32('');
731 else:
732 aoPayload.append('');
733 aoPayload.append('%s' % (sAsUser));
734 aoPayload.append(long(self.cMsTimeout));
735
736 # Kick of the EXEC command.
737 rc = self.sendMsg('EXEC', aoPayload)
738 if rc is True:
739 rc = self.recvAckLogged('EXEC');
740 if rc is True:
741 # Loop till the process completes, feed input to the TXS and
742 # receive output from it.
743 sFailure = "";
744 msPendingInputReply = None;
745 cbMsg, sOpcode, abPayload = (None, None, None);
746 while True:
747 # Pending input?
748 if msPendingInputReply is None \
749 and oStdIn is not None \
750 and not isinstance(oStdIn, basestring):
751 try:
752 abInput = oStdIn.read(65536);
753 except:
754 reporter.errorXcpt('read standard in');
755 sFailure = 'exception reading stdin';
756 rc = None;
757 break;
758 if len(abInput) > 0:
759 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
760 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
761 if rc is not True:
762 sFailure = 'sendMsg failure';
763 break;
764 msPendingInputReply = base.timestampMilli();
765 continue;
766
767 # Wait for input (500 ms timeout).
768 if cbMsg is None:
769 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
770 if cbMsg == None:
771 # Check for time out before restarting the loop.
772 # Note! Only doing timeout checking here does mean that
773 # the TXS may prevent us from timing out by
774 # flooding us with data. This is unlikely though.
775 if self.hasTimedOut() \
776 and ( msPendingInputReply is None \
777 or base.timestampMilli() - msPendingInputReply > 30000):
778 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
779 sFailure = 'timeout';
780 rc = None;
781 break;
782 # Check that the connection is OK.
783 if not self.oTransport.isConnectionOk():
784 self.oTransport.disconnect();
785 sFailure = 'disconnected';
786 rc = False;
787 break;
788 continue;
789
790 # Handle the response.
791 sOpcode = sOpcode.rstrip();
792 if sOpcode == 'STDOUT':
793 oOut = oStdOut;
794 elif sOpcode == 'STDERR':
795 oOut = oStdErr;
796 elif sOpcode == 'TESTPIPE':
797 oOut = oTestPipe;
798 else:
799 oOut = None;
800 if oOut is not None:
801 # Output from the process.
802 if len(abPayload) < 4:
803 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
804 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
805 rc = None;
806 break;
807 uStreamCrc32 = getU32(abPayload, 0);
808 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
809 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
810 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
811 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
812 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
813 rc = None;
814 break;
815 try:
816 oOut.write(abPayload[4:]);
817 except:
818 sFailure = 'exception writing %s' % (sOpcode);
819 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
820 rc = None;
821 break;
822 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
823 # Standard input is ignored. Ignore this condition for now.
824 msPendingInputReply = None;
825 reporter.log('taskExecEx: Standard input is ignored... why?');
826 del oStdIn.uTxsClientCrc32;
827 oStdIn = '/dev/null';
828 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
829 and msPendingInputReply is not None:
830 # TXS STDIN error, abort.
831 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
832 msPendingInputReply = None;
833 sFailure = 'TXS is out of memory for std input buffering';
834 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
835 rc = None;
836 break;
837 elif sOpcode == 'ACK' and msPendingInputReply is not None:
838 msPendingInputReply = None;
839 elif sOpcode.startswith('PROC '):
840 # Process status message, handle it outside the loop.
841 rc = True;
842 break;
843 else:
844 sFailure = 'Unexpected opcode %s' % (sOpcode);
845 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
846 rc = None;
847 break;
848 # Clear the message.
849 cbMsg, sOpcode, abPayload = (None, None, None);
850
851 # If we sent an STDIN packet and didn't get a reply yet, we'll give
852 # TXS some 5 seconds to reply to this. If we don't wait here we'll
853 # get screwed later on if we mix it up with the reply to some other
854 # command. Hackish.
855 if msPendingInputReply is not None:
856 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
857 if cbMsg2 is not None:
858 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
859 % (cbMsg2, sOpcode2, abPayload2));
860 msPendingInputReply = None;
861 else:
862 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
863 self.fScrewedUpMsgState = True;
864
865 # Parse the exit status (True), abort (None) or do nothing (False).
866 if rc is True:
867 if sOpcode == 'PROC OK':
868 rc = True;
869 else:
870 # Do proper parsing some other day if needed:
871 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
872 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
873 rc = False;
874 else:
875 if rc is None:
876 # Abort it.
877 reporter.log('taskExecEx: sending ABORT...');
878 rc = self.sendMsg('ABORT');
879 while rc is True:
880 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
881 if cbMsg is None:
882 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
883 self.fScrewedUpMsgState = True;
884 break;
885 if sOpcode.startswith('PROC '):
886 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
887 break;
888 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
889 # Check that the connection is OK before looping.
890 if not self.oTransport.isConnectionOk():
891 self.oTransport.disconnect();
892 break;
893
894 # Fake response with the reason why we quit.
895 if sFailure is not None:
896 self.t3oReply = (0, 'EXECFAIL', sFailure);
897 rc = None;
898 else:
899 rc = None;
900
901 # Cleanup.
902 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
903 if o is not None and not isinstance(o, basestring):
904 del o.uTxsClientCrc32; # pylint: disable=E1103
905 reporter.log('taskExecEx: returns %s' % (rc));
906 return rc;
907
908 #
909 # Admin tasks
910 #
911
912 def hlpRebootShutdownWaitForAck(self, sCmd):
913 """Wait for reboot/shutodwn ACK."""
914 rc = self.recvAckLogged(sCmd);
915 if rc is True:
916 # poll a little while for server to disconnect.
917 uMsStart = base.timestampMilli();
918 while self.oTransport.isConnectionOk() \
919 and base.timestampMilli() - uMsStart >= 5000:
920 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
921 break;
922 self.oTransport.disconnect();
923 return rc;
924
925 def taskReboot(self):
926 rc = self.sendMsg('REBOOT');
927 if rc is True:
928 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
929 return rc;
930
931 def taskShutdown(self):
932 rc = self.sendMsg('SHUTDOWN');
933 if rc is True:
934 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
935 return rc;
936
937 #
938 # CD/DVD control tasks.
939 #
940
941 ## TODO
942
943 #
944 # File system tasks
945 #
946
947 def taskMkDir(self, sRemoteDir, fMode):
948 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
949 if rc is True:
950 rc = self.recvAckLogged('MKDIR');
951 return rc;
952
953 def taskMkDirPath(self, sRemoteDir, fMode):
954 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
955 if rc is True:
956 rc = self.recvAckLogged('MKDRPATH');
957 return rc;
958
959 def taskMkSymlink(self, sLinkTarget, sLink):
960 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
961 if rc is True:
962 rc = self.recvAckLogged('MKSYMLNK');
963 return rc;
964
965 def taskRmDir(self, sRemoteDir):
966 rc = self.sendMsg('RMDIR', (sRemoteDir,));
967 if rc is True:
968 rc = self.recvAckLogged('RMDIR');
969 return rc;
970
971 def taskRmFile(self, sRemoteFile):
972 rc = self.sendMsg('RMFILE', (sRemoteFile,));
973 if rc is True:
974 rc = self.recvAckLogged('RMFILE');
975 return rc;
976
977 def taskRmSymlink(self, sRemoteSymlink):
978 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
979 if rc is True:
980 rc = self.recvAckLogged('RMSYMLNK');
981 return rc;
982
983 def taskRmTree(self, sRemoteTree):
984 rc = self.sendMsg('RMTREE', (sRemoteTree,));
985 if rc is True:
986 rc = self.recvAckLogged('RMTREE');
987 return rc;
988
989 #def "CHMOD "
990 #def "CHOWN "
991 #def "CHGRP "
992
993 def taskIsDir(self, sRemoteDir):
994 rc = self.sendMsg('ISDIR', (sRemoteDir,));
995 if rc is True:
996 rc = self.recvTrueFalse('ISDIR');
997 return rc;
998
999 def taskIsFile(self, sRemoteFile):
1000 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1001 if rc is True:
1002 rc = self.recvTrueFalse('ISFILE');
1003 return rc;
1004
1005 def taskIsSymlink(self, sRemoteSymlink):
1006 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1007 if rc is True:
1008 rc = self.recvTrueFalse('ISSYMLNK');
1009 return rc;
1010
1011 #def "STAT "
1012 #def "LSTAT "
1013 #def "LIST "
1014
1015 def taskUploadFile(self, sLocalFile, sRemoteFile):
1016 #
1017 # Open the local file (make sure it exist before bothering TXS) and
1018 # tell TXS that we want to upload a file.
1019 #
1020 try:
1021 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1022 except:
1023 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1024 return False;
1025
1026 # Common cause with taskUploadStr
1027 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1028
1029 # Cleanup.
1030 oLocalFile.close();
1031 return rc;
1032
1033 def taskUploadString(self, sContent, sRemoteFile):
1034 # Wrap sContent in a file like class.
1035 class InStringFile(object): # pylint: disable=R0903
1036 def __init__(self, sContent):
1037 self.sContent = sContent;
1038 self.off = 0;
1039
1040 def read(self, cbMax):
1041 cbLeft = len(self.sContent) - self.off;
1042 if cbLeft == 0:
1043 return "";
1044 if cbLeft <= cbMax:
1045 sRet = self.sContent[self.off:(self.off + cbLeft)];
1046 else:
1047 sRet = self.sContent[self.off:(self.off + cbMax)];
1048 self.off = self.off + len(sRet);
1049 return sRet;
1050
1051 oLocalString = InStringFile(sContent);
1052 return self.taskUploadCommon(oLocalString, sRemoteFile);
1053
1054 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1055 """Common worker used by taskUploadFile and taskUploadString."""
1056 # Command + ACK.
1057 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1058 if rc is True:
1059 rc = self.recvAckLogged('PUT FILE');
1060 if rc is True:
1061 #
1062 # Push data packets until eof.
1063 #
1064 uMyCrc32 = zlib.crc32("");
1065 while True:
1066 # Read up to 64 KB of data.
1067 try:
1068 sRaw = oLocalFile.read(65536);
1069 except:
1070 rc = None;
1071 break;
1072
1073 # Convert to array - this is silly!
1074 abBuf = array.array('B');
1075 for i in range(len(sRaw)):
1076 abBuf.append(ord(sRaw[i]));
1077 sRaw = None;
1078
1079 # Update the file stream CRC and send it off.
1080 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1081 if len(abBuf) == 0:
1082 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1083 else:
1084 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1085 if rc is False:
1086 break;
1087
1088 # Wait for the reply.
1089 rc = self.recvAck();
1090 if rc is not True:
1091 if rc is False:
1092 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1093 else:
1094 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1095 rc = False;
1096 break;
1097
1098 # EOF?
1099 if len(abBuf) == 0:
1100 break;
1101
1102 # Send ABORT on ACK and I/O errors.
1103 if rc is None:
1104 rc = self.sendMsg('ABORT');
1105 if rc is True:
1106 self.recvAckLogged('ABORT');
1107 rc = False;
1108 return rc;
1109
1110 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1111 try:
1112 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1113 except:
1114 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1115 return False;
1116
1117 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1118
1119 oLocalFile.close();
1120 if rc is False:
1121 try:
1122 os.remove(sLocalFile);
1123 except:
1124 reporter.errorXcpt();
1125 return rc;
1126
1127 def taskDownloadString(self, sRemoteFile):
1128 # Wrap sContent in a file like class.
1129 class OutStringFile(object): # pylint: disable=R0903
1130 def __init__(self):
1131 self.asContent = [];
1132
1133 def write(self, sBuf):
1134 self.asContent.append(sBuf);
1135 return None;
1136
1137 oLocalString = OutStringFile();
1138 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1139 if rc is True:
1140 if len(oLocalString.asContent) == 0:
1141 rc = '';
1142 else:
1143 rc = ''.join(oLocalString.asContent);
1144 return rc;
1145
1146 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1147 """Common worker for taskDownloadFile and taskDownloadString."""
1148 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1149 if rc is True:
1150 #
1151 # Process data packets until eof.
1152 #
1153 uMyCrc32 = zlib.crc32("");
1154 while rc is True:
1155 cbMsg, sOpcode, abPayload = self.recvReply();
1156 if cbMsg is None:
1157 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1158 rc = None;
1159 break;
1160
1161 # Validate.
1162 sOpcode = sOpcode.rstrip();
1163 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1164 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1165 % (sOpcode, getSZ(abPayload, 0, "None")));
1166 rc = False;
1167 break;
1168 if sOpcode == 'DATA' and len(abPayload) < 4:
1169 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1170 rc = None;
1171 break;
1172 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1173 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1174 rc = None;
1175 break;
1176
1177 # Check the CRC (common for both packets).
1178 uCrc32 = getU32(abPayload, 0);
1179 if sOpcode == 'DATA':
1180 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1181 if uCrc32 != (uMyCrc32 & 0xffffffff):
1182 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1183 % (hex(uMyCrc32), hex(uCrc32)));
1184 rc = None;
1185 break;
1186 if sOpcode == 'DATA EOF':
1187 rc = self.sendMsg('ACK');
1188 break;
1189
1190 # Finally, push the data to the file.
1191 try:
1192 oLocalFile.write(abPayload[4:].tostring());
1193 except:
1194 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1195 rc = None;
1196 break;
1197 rc = self.sendMsg('ACK');
1198
1199 # Send NACK on validation and I/O errors.
1200 if rc is None:
1201 rc = self.sendMsg('NACK');
1202 rc = False;
1203 return rc;
1204
1205 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1206 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1207 if rc is True:
1208 rc = self.recvAckLogged('UNPKFILE');
1209 return rc;
1210
1211 # pylint: enable=C0111
1212
1213
1214 #
1215 # Public methods - generic task queries
1216 #
1217
1218 def isSuccess(self):
1219 """Returns True if the task completed successfully, otherwise False."""
1220 self.lockTask();
1221 sStatus = self.sStatus;
1222 oTaskRc = self.oTaskRc;
1223 self.unlockTask();
1224 if sStatus != "":
1225 return False;
1226 if oTaskRc is False or oTaskRc is None:
1227 return False;
1228 return True;
1229
1230 def getResult(self):
1231 """
1232 Returns the result of a completed task.
1233 Returns None if not completed yet or no previous task.
1234 """
1235 self.lockTask();
1236 sStatus = self.sStatus;
1237 oTaskRc = self.oTaskRc;
1238 self.unlockTask();
1239 if sStatus != "":
1240 return None;
1241 return oTaskRc;
1242
1243 def getLastReply(self):
1244 """
1245 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1246 Returns a None, None, None three-tuple if there was no last reply.
1247 """
1248 self.lockTask();
1249 t3oReply = self.t3oReply;
1250 self.unlockTask();
1251 return t3oReply;
1252
1253 #
1254 # Public methods - connection.
1255 #
1256
1257 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1258 """
1259 Initiates a disconnect task.
1260
1261 Returns True on success, False on failure (logged).
1262
1263 The task returns True on success and False on failure.
1264 """
1265 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1266
1267 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1268 """Synchronous version."""
1269 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1270
1271 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1272 """
1273 Initiates a task for getting the TXS UUID.
1274
1275 Returns True on success, False on failure (logged).
1276
1277 The task returns UUID string (in {}) on success and False on failure.
1278 """
1279 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1280
1281 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1282 """Synchronous version."""
1283 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1284
1285 #
1286 # Public methods - execution.
1287 #
1288
1289 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1290 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1291 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1292 """
1293 Initiates a exec process task.
1294
1295 Returns True on success, False on failure (logged).
1296
1297 The task returns True if the process exited normally with status code 0.
1298 The task returns None if on failure prior to executing the process, and
1299 False if the process exited with a different status or in an abnormal
1300 manner. Both None and False are logged of course and further info can
1301 also be obtained by getLastReply().
1302
1303 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1304 these streams. If None, no special action is taken and the output goes
1305 to where ever the TXS sends its output, and ditto for input.
1306 - To send to / read from the bitbucket, pass '/dev/null'.
1307 - To redirect to/from a file, just specify the remote filename.
1308 - To append to a file use '>>' followed by the remote filename.
1309 - To pipe the stream to/from the TXS, specify a file like
1310 object. For StdIn a non-blocking read() method is required. For
1311 the other a write() method is required. Watch out for deadlock
1312 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1313 """
1314 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1315 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1316 oStdOut, oStdErr, oTestPipe, sAsUser));
1317
1318 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1319 oStdIn = '/dev/null', oStdOut = '/dev/null',
1320 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1321 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1322 """Synchronous version."""
1323 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1324 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1325
1326 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1327 cMsTimeout = 3600000, fIgnoreErrors = False):
1328 """
1329 Initiates a exec process test task.
1330
1331 Returns True on success, False on failure (logged).
1332
1333 The task returns True if the process exited normally with status code 0.
1334 The task returns None if on failure prior to executing the process, and
1335 False if the process exited with a different status or in an abnormal
1336 manner. Both None and False are logged of course and further info can
1337 also be obtained by getLastReply().
1338
1339 Standard in is taken from /dev/null. While both standard output and
1340 standard error goes directly to reporter.log(). The testpipe is piped
1341 to reporter.xxxx.
1342 """
1343
1344 sStdIn = '/dev/null';
1345 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1346 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1347 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1348 else: oTestPipe = '/dev/null';
1349
1350 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1351 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1352
1353 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1354 cMsTimeout = 3600000, fIgnoreErrors = False):
1355 """Synchronous version."""
1356 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1357 cMsTimeout, fIgnoreErrors);
1358
1359 #
1360 # Public methods - file system
1361 #
1362
1363 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1364 """
1365 Initiates a reboot task.
1366
1367 Returns True on success, False on failure (logged).
1368
1369 The task returns True on success, False on failure (logged). The
1370 session will be disconnected on successful task completion.
1371 """
1372 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1373
1374 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1375 """Synchronous version."""
1376 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1377
1378 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1379 """
1380 Initiates a shutdown task.
1381
1382 Returns True on success, False on failure (logged).
1383
1384 The task returns True on success, False on failure (logged).
1385 """
1386 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1387
1388 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1389 """Synchronous version."""
1390 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1391
1392
1393 #
1394 # Public methods - file system
1395 #
1396
1397 def asyncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1398 """
1399 Initiates a mkdir task.
1400
1401 Returns True on success, False on failure (logged).
1402
1403 The task returns True on success, False on failure (logged).
1404 """
1405 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1406
1407 def syncMkDir(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1408 """Synchronous version."""
1409 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1410
1411 def asyncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1412 """
1413 Initiates a mkdir -p task.
1414
1415 Returns True on success, False on failure (logged).
1416
1417 The task returns True on success, False on failure (logged).
1418 """
1419 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1420
1421 def syncMkDirPath(self, sRemoteDir, fMode = 0700, cMsTimeout = 30000, fIgnoreErrors = False):
1422 """Synchronous version."""
1423 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1424
1425 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1426 """
1427 Initiates a symlink task.
1428
1429 Returns True on success, False on failure (logged).
1430
1431 The task returns True on success, False on failure (logged).
1432 """
1433 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1434
1435 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1436 """Synchronous version."""
1437 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1438
1439 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1440 """
1441 Initiates a rmdir task.
1442
1443 Returns True on success, False on failure (logged).
1444
1445 The task returns True on success, False on failure (logged).
1446 """
1447 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1448
1449 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1450 """Synchronous version."""
1451 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1452
1453 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1454 """
1455 Initiates a rmfile task.
1456
1457 Returns True on success, False on failure (logged).
1458
1459 The task returns True on success, False on failure (logged).
1460 """
1461 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1462
1463 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1464 """Synchronous version."""
1465 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1466
1467 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1468 """
1469 Initiates a rmsymlink task.
1470
1471 Returns True on success, False on failure (logged).
1472
1473 The task returns True on success, False on failure (logged).
1474 """
1475 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1476
1477 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1478 """Synchronous version."""
1479 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1480
1481 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1482 """
1483 Initiates a rmtree task.
1484
1485 Returns True on success, False on failure (logged).
1486
1487 The task returns True on success, False on failure (logged).
1488 """
1489 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1490
1491 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1492 """Synchronous version."""
1493 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1494
1495 #def "CHMOD "
1496 #def "CHOWN "
1497 #def "CHGRP "
1498
1499 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1500 """
1501 Initiates a is-dir query task.
1502
1503 Returns True on success, False on failure (logged).
1504
1505 The task returns True if it's a directory, False if it isn't, and
1506 None on error (logged).
1507 """
1508 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1509
1510 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1511 """Synchronous version."""
1512 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1513
1514 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1515 """
1516 Initiates a is-file query task.
1517
1518 Returns True on success, False on failure (logged).
1519
1520 The task returns True if it's a file, False if it isn't, and None on
1521 error (logged).
1522 """
1523 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1524
1525 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1526 """Synchronous version."""
1527 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1528
1529 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1530 """
1531 Initiates a is-symbolic-link query task.
1532
1533 Returns True on success, False on failure (logged).
1534
1535 The task returns True if it's a symbolic linke, False if it isn't, and
1536 None on error (logged).
1537 """
1538 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1539
1540 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1541 """Synchronous version."""
1542 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1543
1544 #def "STAT "
1545 #def "LSTAT "
1546 #def "LIST "
1547
1548 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1549 """
1550 Initiates a download query task.
1551
1552 Returns True on success, False on failure (logged).
1553
1554 The task returns True on success, False on failure (logged).
1555 """
1556 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1557
1558 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1559 """Synchronous version."""
1560 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1561
1562 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1563 """
1564 Initiates a upload string task.
1565
1566 Returns True on success, False on failure (logged).
1567
1568 The task returns True on success, False on failure (logged).
1569 """
1570 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1571
1572 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1573 """Synchronous version."""
1574 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1575
1576 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1577 """
1578 Initiates a download file task.
1579
1580 Returns True on success, False on failure (logged).
1581
1582 The task returns True on success, False on failure (logged).
1583 """
1584 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1585
1586 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1587 """Synchronous version."""
1588 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1589
1590 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1591 """
1592 Initiates a download string task.
1593
1594 Returns True on success, False on failure (logged).
1595
1596 The task returns a byte string on success, False on failure (logged).
1597 """
1598 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1599
1600 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1601 """Synchronous version."""
1602 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1603
1604 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1605 """
1606 Initiates a unpack file task.
1607
1608 Returns True on success, False on failure (logged).
1609
1610 The task returns True on success, False on failure (logged).
1611 """
1612 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1613 (sRemoteFile, sRemoteDir));
1614
1615 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1616 """Synchronous version."""
1617 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1618
1619
1620class TransportTcp(TransportBase):
1621 """
1622 TCP transport layer for the TXS client session class.
1623 """
1624
1625 def __init__(self, sHostname, uPort, fReversedSetup):
1626 """
1627 Save the parameters. The session will call us back to make the
1628 connection later on its worker thread.
1629 """
1630 TransportBase.__init__(self, utils.getCallerName());
1631 self.sHostname = sHostname;
1632 self.fReversedSetup = fReversedSetup;
1633 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1634 self.oSocket = None;
1635 self.oWakeupW = None;
1636 self.oWakeupR = None;
1637 self.fConnectCanceled = False;
1638 self.fIsConnecting = False;
1639 self.oCv = threading.Condition();
1640 self.abReadAhead = array.array('B');
1641
1642 def toString(self):
1643 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1644 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1645 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1646 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1647
1648 def __isInProgressXcpt(self, oXcpt):
1649 """ In progress exception? """
1650 try:
1651 if isinstance(oXcpt, socket.error):
1652 try:
1653 if oXcpt[0] == errno.EINPROGRESS:
1654 return True;
1655 except: pass;
1656 # Windows?
1657 try:
1658 if oXcpt[0] == errno.EWOULDBLOCK:
1659 return True;
1660 except: pass;
1661 except:
1662 pass;
1663 return False;
1664
1665 def __isWouldBlockXcpt(self, oXcpt):
1666 """ Would block exception? """
1667 try:
1668 if isinstance(oXcpt, socket.error):
1669 try:
1670 if oXcpt[0] == errno.EWOULDBLOCK:
1671 return True;
1672 except: pass;
1673 try:
1674 if oXcpt[0] == errno.EAGAIN:
1675 return True;
1676 except: pass;
1677 except:
1678 pass;
1679 return False;
1680
1681 def __isConnectionReset(self, oXcpt):
1682 """ Connection reset by Peer or others. """
1683 try:
1684 if isinstance(oXcpt, socket.error):
1685 try:
1686 if oXcpt[0] == errno.ECONNRESET:
1687 return True;
1688 except: pass;
1689 try:
1690 if oXcpt[0] == errno.ENETRESET:
1691 return True;
1692 except: pass;
1693 except:
1694 pass;
1695 return False;
1696
1697 def _closeWakeupSockets(self):
1698 """ Closes the wakup sockets. Caller should own the CV. """
1699 oWakeupR = self.oWakeupR;
1700 self.oWakeupR = None;
1701 if oWakeupR is not None:
1702 oWakeupR.close();
1703
1704 oWakeupW = self.oWakeupW;
1705 self.oWakeupW = None;
1706 if oWakeupW is not None:
1707 oWakeupW.close();
1708
1709 return None;
1710
1711 def cancelConnect(self):
1712 # This is bad stuff.
1713 self.oCv.acquire();
1714 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1715 self.fConnectCanceled = True;
1716 if self.fIsConnecting:
1717 oSocket = self.oSocket;
1718 self.oSocket = None;
1719 if oSocket is not None:
1720 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1721 oSocket.close();
1722
1723 oWakeupW = self.oWakeupW;
1724 self.oWakeupW = None;
1725 if oWakeupW is not None:
1726 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1727 try: oWakeupW.send('cancelled!\n');
1728 except: reporter.logXcpt();
1729 try: oWakeupW.shutdown(socket.SHUT_WR);
1730 except: reporter.logXcpt();
1731 oWakeupW.close();
1732 self.oCv.release();
1733
1734 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1735 """ Connects to the TXS server as server, i.e. the reversed setup. """
1736 assert(self.fReversedSetup);
1737
1738 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1739
1740 # Workaround for bind() failure...
1741 try:
1742 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1743 except:
1744 reporter.errorXcpt('socket.listen(1) failed');
1745 return None;
1746
1747 # Bind the socket and make it listen.
1748 try:
1749 oSocket.bind((self.sHostname, self.uPort));
1750 except:
1751 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1752 return None;
1753 try:
1754 oSocket.listen(1);
1755 except:
1756 reporter.errorXcpt('socket.listen(1) failed');
1757 return None;
1758
1759 # Accept connections.
1760 oClientSocket = None;
1761 tClientAddr = None;
1762 try:
1763 (oClientSocket, tClientAddr) = oSocket.accept();
1764 except socket.error, e:
1765 if not self.__isInProgressXcpt(e):
1766 raise;
1767
1768 # Do the actual waiting.
1769 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1770 try:
1771 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1772 except socket.error, e:
1773 if e[0] != errno.EBADF or not self.fConnectCanceled:
1774 raise;
1775 reporter.log('socket.select() on accept was canceled');
1776 return None;
1777 except:
1778 reporter.logXcpt('socket.select() on accept');
1779
1780 # Try accept again.
1781 try:
1782 (oClientSocket, tClientAddr) = oSocket.accept();
1783 except socket.error, e:
1784 if not self.__isInProgressXcpt(e):
1785 if e[0] != errno.EBADF or not self.fConnectCanceled:
1786 raise;
1787 reporter.log('socket.accept() was canceled');
1788 return None;
1789 reporter.log('socket.accept() timed out');
1790 return False;
1791 except:
1792 reporter.errorXcpt('socket.accept() failed');
1793 return None;
1794 except:
1795 reporter.errorXcpt('socket.accept() failed');
1796 return None;
1797
1798 # Store the connected socket and throw away the server socket.
1799 self.oCv.acquire();
1800 if not self.fConnectCanceled:
1801 self.oSocket.close();
1802 self.oSocket = oClientSocket;
1803 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1804 self.oCv.release();
1805 return True;
1806
1807 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1808 """ Connects to the TXS server as client. """
1809 assert(not self.fReversedSetup);
1810
1811 # Connect w/ timeouts.
1812 rc = None;
1813 try:
1814 oSocket.connect((self.sHostname, self.uPort));
1815 rc = True;
1816 except socket.error, e:
1817 iRc = e[0];
1818 if self.__isInProgressXcpt(e):
1819 # Do the actual waiting.
1820 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1821 try:
1822 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1823 if len(ttRc[1]) + len(ttRc[2]) == 0:
1824 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1825 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1826 rc = iRc == 0;
1827 except socket.error, e:
1828 iRc = e[0];
1829 except:
1830 iRc = -42;
1831 reporter.fatalXcpt('socket.select() on connect failed');
1832
1833 if rc is True:
1834 pass;
1835 elif iRc == errno.ECONNREFUSED \
1836 or iRc == errno.EHOSTUNREACH \
1837 or iRc == errno.EINTR \
1838 or iRc == errno.ENETDOWN \
1839 or iRc == errno.ENETUNREACH \
1840 or iRc == errno.ETIMEDOUT:
1841 rc = False; # try again.
1842 else:
1843 if iRc != errno.EBADF or not self.fConnectCanceled:
1844 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1845 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1846 except:
1847 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1848 return rc;
1849
1850
1851 def connect(self, cMsTimeout):
1852 # Create a non-blocking socket.
1853 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1854 try:
1855 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1856 except:
1857 reporter.fatalXcpt('socket.socket() failed');
1858 return None;
1859 try:
1860 oSocket.setblocking(0);
1861 except:
1862 oSocket.close();
1863 reporter.fatalXcpt('socket.socket() failed');
1864 return None;
1865
1866 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1867 oWakeupR = None;
1868 oWakeupW = None;
1869 if hasattr(socket, 'socketpair'):
1870 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1871 except: reporter.logXcpt('socket.socketpair() failed');
1872
1873 # Update the state.
1874 self.oCv.acquire();
1875 rc = None;
1876 if not self.fConnectCanceled:
1877 self.oSocket = oSocket;
1878 self.oWakeupW = oWakeupW;
1879 self.oWakeupR = oWakeupR;
1880 self.fIsConnecting = True;
1881 self.oCv.release();
1882
1883 # Try connect.
1884 if oWakeupR is None:
1885 oWakeupR = oSocket; # Avoid select failure.
1886 if self.fReversedSetup:
1887 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1888 else:
1889 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1890 oSocket = None;
1891
1892 # Update the state and cleanup on failure/cancel.
1893 self.oCv.acquire();
1894 if rc is True and self.fConnectCanceled:
1895 rc = False;
1896 self.fIsConnecting = False;
1897
1898 if rc is not True:
1899 if self.oSocket is not None:
1900 self.oSocket.close();
1901 self.oSocket = None;
1902 self._closeWakeupSockets();
1903 self.oCv.release();
1904
1905 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1906 return rc;
1907
1908 def disconnect(self, fQuiet = False):
1909 if self.oSocket is not None:
1910 self.abReadAhead = array.array('B');
1911
1912 # Try a shutting down the socket gracefully (draining it).
1913 try:
1914 self.oSocket.shutdown(socket.SHUT_WR);
1915 except:
1916 if not fQuiet:
1917 reporter.error('shutdown(SHUT_WR)');
1918 try:
1919 self.oSocket.setblocking(0); # just in case it's not set.
1920 sData = "1";
1921 while len(sData) > 0:
1922 sData = self.oSocket.recv(16384);
1923 except:
1924 pass;
1925
1926 # Close it.
1927 self.oCv.acquire();
1928 try: self.oSocket.setblocking(1);
1929 except: pass;
1930 self.oSocket.close();
1931 self.oSocket = None;
1932 else:
1933 self.oCv.acquire();
1934 self._closeWakeupSockets();
1935 self.oCv.release();
1936
1937 def sendBytes(self, abMsg, cMsTimeout):
1938 if self.oSocket is None:
1939 reporter.error('TransportTcp.sendBytes: No connection.');
1940 return False;
1941
1942 # Try send it all.
1943 try:
1944 cbSent = self.oSocket.send(abMsg);
1945 if cbSent == len(abMsg):
1946 return True;
1947 except Exception, oXcpt:
1948 if not self.__isWouldBlockXcpt(oXcpt):
1949 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1950 return False;
1951 cbSent = 0;
1952
1953 # Do a timed send.
1954 msStart = base.timestampMilli();
1955 while True:
1956 cMsElapsed = base.timestampMilli() - msStart;
1957 if cMsElapsed > cMsTimeout:
1958 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abMsg)));
1959 break;
1960
1961 # wait.
1962 try:
1963 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1964 if len(ttRc[2]) > 0 and len(ttRc[1]) == 0:
1965 reporter.error('TranportTcp.sendBytes: select returned with exception');
1966 break;
1967 if len(ttRc[1]) == 0:
1968 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abMsg)));
1969 break;
1970 except:
1971 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1972 break;
1973
1974 # Try send more.
1975 try:
1976 cbSent += self.oSocket.send(abMsg[cbSent:]);
1977 if cbSent == len(abMsg):
1978 return True;
1979 except Exception, oXcpt:
1980 if not self.__isWouldBlockXcpt(oXcpt):
1981 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abMsg)));
1982 break;
1983
1984 return False;
1985
1986 def __returnReadAheadBytes(self, cb):
1987 """ Internal worker for recvBytes. """
1988 assert(len(self.abReadAhead) >= cb);
1989 abRet = self.abReadAhead[:cb];
1990 self.abReadAhead = self.abReadAhead[cb:];
1991 return abRet;
1992
1993 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
1994 if self.oSocket is None:
1995 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
1996 return None;
1997
1998 # Try read in some more data without bothering with timeout handling first.
1999 if len(self.abReadAhead) < cb:
2000 try:
2001 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2002 if len(abBuf) > 0:
2003 self.abReadAhead.extend(array.array('B', abBuf));
2004 except Exception, oXcpt:
2005 if not self.__isWouldBlockXcpt(oXcpt):
2006 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2007 return None;
2008
2009 if len(self.abReadAhead) >= cb:
2010 return self.__returnReadAheadBytes(cb);
2011
2012 # Timeout loop.
2013 msStart = base.timestampMilli();
2014 while True:
2015 cMsElapsed = base.timestampMilli() - msStart;
2016 if cMsElapsed > cMsTimeout:
2017 if not fNoDataOk or len(self.abReadAhead) > 0:
2018 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2019 break;
2020
2021 # Wait.
2022 try:
2023 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2024 if len(ttRc[2]) > 0 and len(ttRc[0]) == 0:
2025 reporter.error('TranportTcp.recvBytes: select returned with exception');
2026 break;
2027 if len(ttRc[0]) == 0:
2028 if not fNoDataOk or len(self.abReadAhead) > 0:
2029 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2030 % (len(self.abReadAhead), cb, fNoDataOk));
2031 break;
2032 except:
2033 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2034 break;
2035
2036 # Try read more.
2037 try:
2038 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2039 if len(abBuf) == 0:
2040 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2041 % (len(self.abReadAhead), cb, fNoDataOk));
2042 self.disconnect();
2043 return None;
2044
2045 self.abReadAhead.extend(array.array('B', abBuf));
2046
2047 except Exception, oXcpt:
2048 reporter.log('recv => exception %s' % (oXcpt,));
2049 if not self.__isWouldBlockXcpt(oXcpt):
2050 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or len(self.abReadAhead) > 0:
2051 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2052 break;
2053
2054 # Done?
2055 if len(self.abReadAhead) >= cb:
2056 return self.__returnReadAheadBytes(cb);
2057
2058 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2059 return None;
2060
2061 def isConnectionOk(self):
2062 if self.oSocket is None:
2063 return False;
2064 try:
2065 ttRc = select.select([], [], [self.oSocket], 0.0);
2066 if len(ttRc[2]) > 0:
2067 return False;
2068
2069 self.oSocket.send(array.array('B')); # send zero bytes.
2070 except:
2071 return False;
2072 return True;
2073
2074 def isRecvPending(self, cMsTimeout = 0):
2075 try:
2076 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2077 if len(ttRc[0]) == 0:
2078 return False;
2079 except:
2080 pass;
2081 return True;
2082
2083
2084def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2085 """
2086 Opens a connection to a Test Execution Service via TCP, given its name.
2087 """
2088 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2089 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2090 try:
2091 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2092 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2093 except:
2094 reporter.errorXcpt(None, 15);
2095 return None;
2096 return oSession;
2097
2098
2099def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2100 """
2101 Tries to open a connection to a Test Execution Service via TCP, given its name.
2102
2103 This differs from openTcpSession in that it won't log a connection failure
2104 as an error.
2105 """
2106 try:
2107 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2108 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2109 except:
2110 reporter.errorXcpt(None, 15);
2111 return None;
2112 return oSession;
2113
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette