VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py@ 70521

Last change on this file since 70521 was 70521, checked in by vboxsync, 7 years ago

ValidationKit: More python 3 adjustments.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 79.7 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 70521 2018-01-10 15:49:10Z vboxsync $
3# pylint: disable=C0302
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2017 Oracle Corporation
11
12This file is part of VirtualBox Open Source Edition (OSE), as
13available from http://www.virtualbox.org. This file is free software;
14you can redistribute it and/or modify it under the terms of the GNU
15General Public License (GPL) as published by the Free Software
16Foundation, in version 2 as it comes in the "COPYING" file of the
17VirtualBox OSE distribution. VirtualBox OSE is distributed in the
18hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
19
20The contents of this file may alternatively be used under the terms
21of the Common Development and Distribution License Version 1.0
22(CDDL) only, as it comes in the "COPYING.CDDL" file of the
23VirtualBox OSE distribution, in which case the provisions of the
24CDDL are applicable instead of those of the GPL.
25
26You may elect to license modified versions of this file under the
27terms and conditions of either the GPL or the CDDL or both.
28"""
29__version__ = "$Revision: 70521 $"
30
31# Standard Python imports.
32import array;
33import errno;
34import os;
35import select;
36import socket;
37import sys;
38import threading;
39import time;
40import types;
41import zlib;
42import uuid;
43
44# Validation Kit imports.
45from common import utils;
46from testdriver import base;
47from testdriver import reporter;
48from testdriver.base import TdTaskBase;
49
50# Python 3 hacks:
51if sys.version_info[0] >= 3:
52 long = int; # pylint: disable=redefined-builtin,invalid-name
53
54#
55# Helpers for decoding data received from the TXS.
56# These are used both the Session and Transport classes.
57#
58
59def getU32(abData, off):
60 """Get a U32 field."""
61 return abData[off] \
62 + abData[off + 1] * 256 \
63 + abData[off + 2] * 65536 \
64 + abData[off + 3] * 16777216;
65
66def getSZ(abData, off, sDefault = None):
67 """
68 Get a zero-terminated string field.
69 Returns sDefault if the string is invalid.
70 """
71 cchStr = getSZLen(abData, off);
72 if cchStr >= 0:
73 abStr = abData[off:(off + cchStr)];
74 try:
75 return abStr.tostring().decode('utf_8');
76 except:
77 reporter.errorXcpt('getSZ(,%u)' % (off));
78 return sDefault;
79
80def getSZLen(abData, off):
81 """
82 Get the length of a zero-terminated string field, in bytes.
83 Returns -1 if off is beyond the data packet or not properly terminated.
84 """
85 cbData = len(abData);
86 if off >= cbData:
87 return -1;
88
89 offCur = off;
90 while abData[offCur] != 0:
91 offCur = offCur + 1;
92 if offCur >= cbData:
93 return -1;
94
95 return offCur - off;
96
97def isValidOpcodeEncoding(sOpcode):
98 """
99 Checks if the specified opcode is valid or not.
100 Returns True on success.
101 Returns False if it is invalid, details in the log.
102 """
103 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
104 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
105 if len(sOpcode) != 8:
106 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
107 return False;
108 for i in range(0, 1):
109 if sSet1.find(sOpcode[i]) < 0:
110 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
111 return False;
112 for i in range(2, 7):
113 if sSet2.find(sOpcode[i]) < 0:
114 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
115 return False;
116 return True;
117
118#
119# Helper for encoding data sent to the TXS.
120#
121
122def u32ToByteArray(u32):
123 """Encodes the u32 value as a little endian byte (B) array."""
124 return array.array('B', \
125 ( u32 % 256, \
126 (u32 // 256) % 256, \
127 (u32 // 65536) % 256, \
128 (u32 // 16777216) % 256) );
129
130
131
132class TransportBase(object):
133 """
134 Base class for the transport layer.
135 """
136
137 def __init__(self, sCaller):
138 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
139 self.fDummy = 0;
140 self.abReadAheadHdr = array.array('B');
141
142 def toString(self):
143 """
144 Stringify the instance for logging and debugging.
145 """
146 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
147
148 def __str__(self):
149 return self.toString();
150
151 def cancelConnect(self):
152 """
153 Cancels any pending connect() call.
154 Returns None;
155 """
156 return None;
157
158 def connect(self, cMsTimeout):
159 """
160 Quietly attempts to connect to the TXS.
161
162 Returns True on success.
163 Returns False on retryable errors (no logging).
164 Returns None on fatal errors with details in the log.
165
166 Override this method, don't call super.
167 """
168 _ = cMsTimeout;
169 return False;
170
171 def disconnect(self, fQuiet = False):
172 """
173 Disconnect from the TXS.
174
175 Returns True.
176
177 Override this method, don't call super.
178 """
179 _ = fQuiet;
180 return True;
181
182 def sendBytes(self, abBuf, cMsTimeout):
183 """
184 Sends the bytes in the buffer abBuf to the TXS.
185
186 Returns True on success.
187 Returns False on failure and error details in the log.
188
189 Override this method, don't call super.
190
191 Remarks: len(abBuf) is always a multiple of 16.
192 """
193 _ = abBuf; _ = cMsTimeout;
194 return False;
195
196 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
197 """
198 Receive cb number of bytes from the TXS.
199
200 Returns the bytes (array('B')) on success.
201 Returns None on failure and error details in the log.
202
203 Override this method, don't call super.
204
205 Remarks: cb is always a multiple of 16.
206 """
207 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
208 return None;
209
210 def isConnectionOk(self):
211 """
212 Checks if the connection is OK.
213
214 Returns True if it is.
215 Returns False if it isn't (caller should call diconnect).
216
217 Override this method, don't call super.
218 """
219 return True;
220
221 def isRecvPending(self, cMsTimeout = 0):
222 """
223 Checks if there is incoming bytes, optionally waiting cMsTimeout
224 milliseconds for something to arrive.
225
226 Returns True if there is, False if there isn't.
227
228 Override this method, don't call super.
229 """
230 _ = cMsTimeout;
231 return False;
232
233 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
234 """
235 Sends a message (opcode + encoded payload).
236
237 Returns True on success.
238 Returns False on failure and error details in the log.
239 """
240 # Fix + check the opcode.
241 if len(sOpcode) < 2:
242 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
243 return False;
244 sOpcode = sOpcode.ljust(8);
245 if not isValidOpcodeEncoding(sOpcode):
246 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
247 return False;
248
249 # Start construct the message.
250 cbMsg = 16 + len(abPayload);
251 abMsg = array.array('B');
252 abMsg.extend(u32ToByteArray(cbMsg));
253 abMsg.extend((0, 0, 0, 0)); # uCrc32
254 try:
255 abMsg.extend(array.array('B', \
256 ( ord(sOpcode[0]), \
257 ord(sOpcode[1]), \
258 ord(sOpcode[2]), \
259 ord(sOpcode[3]), \
260 ord(sOpcode[4]), \
261 ord(sOpcode[5]), \
262 ord(sOpcode[6]), \
263 ord(sOpcode[7]) ) ) );
264 if abPayload:
265 abMsg.extend(abPayload);
266 except:
267 reporter.fatalXcpt('sendMsgInt: packing problem...');
268 return False;
269
270 # checksum it, padd it and send it off.
271 uCrc32 = zlib.crc32(abMsg[8:]);
272 abMsg[4:8] = u32ToByteArray(uCrc32);
273
274 while len(abMsg) % 16:
275 abMsg.append(0);
276
277 reporter.log2('sendMsgInt: op=%s len=%d to=%d' % (sOpcode, len(abMsg), cMsTimeout));
278 return self.sendBytes(abMsg, cMsTimeout);
279
280 def recvMsg(self, cMsTimeout, fNoDataOk = False):
281 """
282 Receives a message from the TXS.
283
284 Returns the message three-tuple: length, opcode, payload.
285 Returns (None, None, None) on failure and error details in the log.
286 """
287
288 # Read the header.
289 if self.abReadAheadHdr:
290 assert(len(self.abReadAheadHdr) == 16);
291 abHdr = self.abReadAheadHdr;
292 self.abReadAheadHdr = array.array('B');
293 else:
294 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk);
295 if abHdr is None:
296 return (None, None, None);
297 if len(abHdr) != 16:
298 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
299 return (None, None, None);
300
301 # Unpack and validate the header.
302 cbMsg = getU32(abHdr, 0);
303 uCrc32 = getU32(abHdr, 4);
304 sOpcode = abHdr[8:16].tostring().decode('ascii');
305
306 if cbMsg < 16:
307 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
308 return (None, None, None);
309 if cbMsg > 1024*1024:
310 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
311 return (None, None, None);
312 if not isValidOpcodeEncoding(sOpcode):
313 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
314 return (None, None, None);
315
316 # Get the payload (if any), dropping the padding.
317 abPayload = array.array('B');
318 if cbMsg > 16:
319 if cbMsg % 16:
320 cbPadding = 16 - (cbMsg % 16);
321 else:
322 cbPadding = 0;
323 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False);
324 if abPayload is None:
325 self.abReadAheadHdr = abHdr;
326 if not fNoDataOk :
327 reporter.log('recvMsg: failed to recv payload bytes!');
328 return (None, None, None);
329
330 while cbPadding > 0:
331 abPayload.pop();
332 cbPadding = cbPadding - 1;
333
334 # Check the CRC-32.
335 if uCrc32 != 0:
336 uActualCrc32 = zlib.crc32(abHdr[8:]);
337 if cbMsg > 16:
338 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
339 uActualCrc32 = uActualCrc32 & 0xffffffff;
340 if uCrc32 != uActualCrc32:
341 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
342 return (None, None, None);
343
344 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
345 return (cbMsg, sOpcode, abPayload);
346
347 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
348 """
349 Sends a message (opcode + payload tuple).
350
351 Returns True on success.
352 Returns False on failure and error details in the log.
353 Returns None if you pass the incorrectly typed parameters.
354 """
355 # Encode the payload.
356 abPayload = array.array('B');
357 for o in aoPayload:
358 try:
359 if utils.isString(o):
360 if sys.version_info[0] >= 3:
361 abPayload.extend(o.encode('utf_8'));
362 else:
363 # the primitive approach...
364 sUtf8 = o.encode('utf_8');
365 for i in range(0, len(sUtf8)):
366 abPayload.append(ord(sUtf8[i]))
367 abPayload.append(0);
368 elif isinstance(o, types.LongType):
369 if o < 0 or o > 0xffffffff:
370 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
371 return None;
372 abPayload.extend(u32ToByteArray(o));
373 elif isinstance(o, array.array):
374 abPayload.extend(o);
375 else:
376 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
377 return None;
378 except:
379 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
380 return None;
381 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
382
383
384class Session(TdTaskBase):
385 """
386 A Test eXecution Service (TXS) client session.
387 """
388
389 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False):
390 """
391 Construct a TXS session.
392
393 This starts by connecting to the TXS and will enter the signalled state
394 when connected or the timeout has been reached.
395 """
396 TdTaskBase.__init__(self, utils.getCallerName());
397 self.oTransport = oTransport;
398 self.sStatus = "";
399 self.cMsTimeout = 0;
400 self.fErr = True; # Whether to report errors as error.
401 self.msStart = 0;
402 self.oThread = None;
403 self.fnTask = self.taskDummy;
404 self.aTaskArgs = None;
405 self.oTaskRc = None;
406 self.t3oReply = (None, None, None);
407 self.fScrewedUpMsgState = False;
408 self.fTryConnect = fTryConnect;
409
410 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
411 raise base.GenError("startTask failed");
412
413 def __del__(self):
414 """Make sure to cancel the task when deleted."""
415 self.cancelTask();
416
417 def toString(self):
418 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
419 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
420 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
421 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
422
423 def taskDummy(self):
424 """Place holder to catch broken state handling."""
425 raise Exception();
426
427 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
428 """
429 Kicks of a new task.
430
431 cMsTimeout: The task timeout in milliseconds. Values less than
432 500 ms will be adjusted to 500 ms. This means it is
433 OK to use negative value.
434 sStatus: The task status.
435 fnTask: The method that'll execute the task.
436 aArgs: Arguments to pass to fnTask.
437
438 Returns True on success, False + error in log on failure.
439 """
440 if not self.cancelTask():
441 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
442 return False;
443
444 # Change status and make sure we're the
445 self.lockTask();
446 if self.sStatus != "":
447 self.unlockTask();
448 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
449 return False;
450 self.sStatus = "setup";
451 self.oTaskRc = None;
452 self.t3oReply = (None, None, None);
453 self.resetTaskLocked();
454 self.unlockTask();
455
456 self.cMsTimeout = max(cMsTimeout, 500);
457 self.fErr = not fIgnoreErrors;
458 self.fnTask = fnTask;
459 self.aTaskArgs = aArgs;
460 self.oThread = threading.Thread(target=self.taskThread, args=(), name=('TXS-%s' % (sStatus)));
461 self.oThread.setDaemon(True);
462 self.msStart = base.timestampMilli();
463
464 self.lockTask();
465 self.sStatus = sStatus;
466 self.unlockTask();
467 self.oThread.start();
468
469 return True;
470
471 def cancelTask(self, fSync = True):
472 """
473 Attempts to cancel any pending tasks.
474 Returns success indicator (True/False).
475 """
476 self.lockTask();
477
478 if self.sStatus == "":
479 self.unlockTask();
480 return True;
481 if self.sStatus == "setup":
482 self.unlockTask();
483 return False;
484 if self.sStatus == "cancelled":
485 self.unlockTask();
486 return False;
487
488 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
489 if self.sStatus == 'connecting':
490 self.oTransport.cancelConnect();
491
492 self.sStatus = "cancelled";
493 oThread = self.oThread;
494 self.unlockTask();
495
496 if not fSync:
497 return False;
498
499 oThread.join(61.0);
500 return oThread.isAlive();
501
502 def taskThread(self):
503 """
504 The task thread function.
505 This does some housekeeping activities around the real task method call.
506 """
507 if not self.isCancelled():
508 try:
509 fnTask = self.fnTask;
510 oTaskRc = fnTask(*self.aTaskArgs);
511 except:
512 reporter.fatalXcpt('taskThread', 15);
513 oTaskRc = None;
514 else:
515 reporter.log('taskThread: cancelled already');
516
517 self.lockTask();
518
519 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
520 self.oTaskRc = oTaskRc;
521 self.oThread = None;
522 self.sStatus = '';
523 self.signalTaskLocked();
524
525 self.unlockTask();
526 return None;
527
528 def isCancelled(self):
529 """Internal method for checking if the task has been cancelled."""
530 self.lockTask();
531 sStatus = self.sStatus;
532 self.unlockTask();
533 if sStatus == "cancelled":
534 return True;
535 return False;
536
537 def hasTimedOut(self):
538 """Internal method for checking if the task has timed out or not."""
539 cMsLeft = self.getMsLeft();
540 if cMsLeft <= 0:
541 return True;
542 return False;
543
544 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
545 """Gets the time left until the timeout."""
546 cMsElapsed = base.timestampMilli() - self.msStart;
547 if cMsElapsed < 0:
548 return cMsMin;
549 cMsLeft = self.cMsTimeout - cMsElapsed;
550 if cMsLeft <= cMsMin:
551 return cMsMin;
552 if cMsLeft > cMsMax and cMsMax > 0:
553 return cMsMax
554 return cMsLeft;
555
556 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
557 """
558 Wrapper for TransportBase.recvMsg that stashes the response away
559 so the client can inspect it later on.
560 """
561 if cMsTimeout is None:
562 cMsTimeout = self.getMsLeft(500);
563 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
564 self.lockTask();
565 self.t3oReply = (cbMsg, sOpcode, abPayload);
566 self.unlockTask();
567 return (cbMsg, sOpcode, abPayload);
568
569 def recvAck(self, fNoDataOk = False):
570 """
571 Receives an ACK or error response from the TXS.
572
573 Returns True on success.
574 Returns False on timeout or transport error.
575 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
576 and there are always details of some sort or another.
577 """
578 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
579 if cbMsg is None:
580 return False;
581 sOpcode = sOpcode.strip()
582 if sOpcode == "ACK":
583 return True;
584 return (sOpcode, getSZ(abPayload, 0, sOpcode));
585
586 def recvAckLogged(self, sCommand, fNoDataOk = False):
587 """
588 Wrapper for recvAck and logging.
589 Returns True on success (ACK).
590 Returns False on time, transport error and errors signalled by TXS.
591 """
592 rc = self.recvAck(fNoDataOk);
593 if rc is not True and not fNoDataOk:
594 if rc is False:
595 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
596 else:
597 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
598 rc = False;
599 return rc;
600
601 def recvTrueFalse(self, sCommand):
602 """
603 Receives a TRUE/FALSE response from the TXS.
604 Returns True on TRUE, False on FALSE and None on error/other (logged).
605 """
606 cbMsg, sOpcode, abPayload = self.recvReply();
607 if cbMsg is None:
608 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
609 return None;
610
611 sOpcode = sOpcode.strip()
612 if sOpcode == "TRUE":
613 return True;
614 if sOpcode == "FALSE":
615 return False;
616 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
617 return None;
618
619 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
620 """
621 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
622 """
623 if cMsTimeout is None:
624 cMsTimeout = self.getMsLeft(500);
625 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
626
627 def asyncToSync(self, fnAsync, *aArgs):
628 """
629 Wraps an asynchronous task into a synchronous operation.
630
631 Returns False on failure, task return status on success.
632 """
633 rc = fnAsync(*aArgs);
634 if rc is False:
635 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
636 return rc;
637
638 rc = self.waitForTask(self.cMsTimeout + 5000);
639 if rc is False:
640 reporter.maybeErrXcpt(self.fErr, 'asyncToSync: waitForTask failed...');
641 self.cancelTask();
642 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
643 return False;
644
645 rc = self.getResult();
646 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
647 return rc;
648
649 #
650 # Connection tasks.
651 #
652
653 def taskConnect(self, cMsIdleFudge):
654 """Tries to connect to the TXS"""
655 while not self.isCancelled():
656 reporter.log2('taskConnect: connecting ...');
657 rc = self.oTransport.connect(self.getMsLeft(500));
658 if rc is True:
659 reporter.log('taskConnect: succeeded');
660 return self.taskGreet(cMsIdleFudge);
661 if rc is None:
662 reporter.log2('taskConnect: unable to connect');
663 return None;
664 if self.hasTimedOut():
665 reporter.log2('taskConnect: timed out');
666 if not self.fTryConnect:
667 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
668 return False;
669 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
670 if not self.fTryConnect:
671 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
672 return False;
673
674 def taskGreet(self, cMsIdleFudge):
675 """Greets the TXS"""
676 rc = self.sendMsg("HOWDY", ());
677 if rc is True:
678 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
679 if rc is True:
680 while cMsIdleFudge > 0:
681 cMsIdleFudge -= 1000;
682 time.sleep(1);
683 else:
684 self.oTransport.disconnect(self.fTryConnect);
685 return rc;
686
687 def taskBye(self):
688 """Says goodbye to the TXS"""
689 rc = self.sendMsg("BYE");
690 if rc is True:
691 rc = self.recvAckLogged("BYE");
692 self.oTransport.disconnect();
693 return rc;
694
695 def taskUuid(self):
696 """Gets the TXS UUID"""
697 rc = self.sendMsg("UUID");
698 if rc is True:
699 rc = False;
700 cbMsg, sOpcode, abPayload = self.recvReply();
701 if cbMsg is not None:
702 sOpcode = sOpcode.strip()
703 if sOpcode == "ACK UUID":
704 sUuid = getSZ(abPayload, 0);
705 if sUuid is not None:
706 sUuid = '{%s}' % (sUuid,)
707 try:
708 _ = uuid.UUID(sUuid);
709 rc = sUuid;
710 except:
711 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
712 else:
713 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
714 else:
715 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
716 else:
717 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
718 return rc;
719
720 #
721 # Process task
722 # pylint: disable=C0111
723 #
724
725 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=R0913,R0914,R0915,C0301
726 # Construct the payload.
727 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
728 for sArg in asArgs:
729 aoPayload.append('%s' % (sArg));
730 aoPayload.append(long(len(asAddEnv)));
731 for sPutEnv in asAddEnv:
732 aoPayload.append('%s' % (sPutEnv));
733 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
734 if utils.isString(o):
735 aoPayload.append(o);
736 elif o is not None:
737 aoPayload.append('|');
738 o.uTxsClientCrc32 = zlib.crc32('');
739 else:
740 aoPayload.append('');
741 aoPayload.append('%s' % (sAsUser));
742 aoPayload.append(long(self.cMsTimeout));
743
744 # Kick of the EXEC command.
745 rc = self.sendMsg('EXEC', aoPayload)
746 if rc is True:
747 rc = self.recvAckLogged('EXEC');
748 if rc is True:
749 # Loop till the process completes, feed input to the TXS and
750 # receive output from it.
751 sFailure = "";
752 msPendingInputReply = None;
753 cbMsg, sOpcode, abPayload = (None, None, None);
754 while True:
755 # Pending input?
756 if msPendingInputReply is None \
757 and oStdIn is not None \
758 and not utils.isString(oStdIn):
759 try:
760 sInput = oStdIn.read(65536);
761 except:
762 reporter.errorXcpt('read standard in');
763 sFailure = 'exception reading stdin';
764 rc = None;
765 break;
766 if sInput:
767 oStdIn.uTxsClientCrc32 = zlib.crc32(sInput, oStdIn.uTxsClientCrc32);
768 # Convert to a byte array before handing it of to sendMsg or the string
769 # will get some zero termination added breaking the CRC (and injecting
770 # unwanted bytes).
771 abInput = array.array('B', sInput);
772 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
773 if rc is not True:
774 sFailure = 'sendMsg failure';
775 break;
776 msPendingInputReply = base.timestampMilli();
777 continue;
778
779 rc = self.sendMsg('STDINEOS');
780 oStdIn = None;
781 if rc is not True:
782 sFailure = 'sendMsg failure';
783 break;
784 msPendingInputReply = base.timestampMilli();
785
786 # Wait for input (500 ms timeout).
787 if cbMsg is None:
788 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
789 if cbMsg is None:
790 # Check for time out before restarting the loop.
791 # Note! Only doing timeout checking here does mean that
792 # the TXS may prevent us from timing out by
793 # flooding us with data. This is unlikely though.
794 if self.hasTimedOut() \
795 and ( msPendingInputReply is None \
796 or base.timestampMilli() - msPendingInputReply > 30000):
797 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
798 sFailure = 'timeout';
799 rc = None;
800 break;
801 # Check that the connection is OK.
802 if not self.oTransport.isConnectionOk():
803 self.oTransport.disconnect();
804 sFailure = 'disconnected';
805 rc = False;
806 break;
807 continue;
808
809 # Handle the response.
810 sOpcode = sOpcode.rstrip();
811 if sOpcode == 'STDOUT':
812 oOut = oStdOut;
813 elif sOpcode == 'STDERR':
814 oOut = oStdErr;
815 elif sOpcode == 'TESTPIPE':
816 oOut = oTestPipe;
817 else:
818 oOut = None;
819 if oOut is not None:
820 # Output from the process.
821 if len(abPayload) < 4:
822 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
823 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
824 rc = None;
825 break;
826 uStreamCrc32 = getU32(abPayload, 0);
827 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
828 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
829 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes)' \
830 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg);
831 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
832 rc = None;
833 break;
834 try:
835 oOut.write(abPayload[4:]);
836 except:
837 sFailure = 'exception writing %s' % (sOpcode);
838 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
839 rc = None;
840 break;
841 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
842 # Standard input is ignored. Ignore this condition for now.
843 msPendingInputReply = None;
844 reporter.log('taskExecEx: Standard input is ignored... why?');
845 del oStdIn.uTxsClientCrc32;
846 oStdIn = '/dev/null';
847 elif (sOpcode == 'STDINMEM' or sOpcode == 'STDINBAD' or sOpcode == 'STDINCRC')\
848 and msPendingInputReply is not None:
849 # TXS STDIN error, abort.
850 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
851 msPendingInputReply = None;
852 sFailure = 'TXS is out of memory for std input buffering';
853 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
854 rc = None;
855 break;
856 elif sOpcode == 'ACK' and msPendingInputReply is not None:
857 msPendingInputReply = None;
858 elif sOpcode.startswith('PROC '):
859 # Process status message, handle it outside the loop.
860 rc = True;
861 break;
862 else:
863 sFailure = 'Unexpected opcode %s' % (sOpcode);
864 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
865 rc = None;
866 break;
867 # Clear the message.
868 cbMsg, sOpcode, abPayload = (None, None, None);
869
870 # If we sent an STDIN packet and didn't get a reply yet, we'll give
871 # TXS some 5 seconds to reply to this. If we don't wait here we'll
872 # get screwed later on if we mix it up with the reply to some other
873 # command. Hackish.
874 if msPendingInputReply is not None:
875 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
876 if cbMsg2 is not None:
877 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
878 % (cbMsg2, sOpcode2, abPayload2));
879 msPendingInputReply = None;
880 else:
881 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
882 self.fScrewedUpMsgState = True;
883
884 # Parse the exit status (True), abort (None) or do nothing (False).
885 if rc is True:
886 if sOpcode != 'PROC OK':
887 # Do proper parsing some other day if needed:
888 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
889 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
890 rc = False;
891 else:
892 if rc is None:
893 # Abort it.
894 reporter.log('taskExecEx: sending ABORT...');
895 rc = self.sendMsg('ABORT');
896 while rc is True:
897 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
898 if cbMsg is None:
899 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
900 self.fScrewedUpMsgState = True;
901 break;
902 if sOpcode.startswith('PROC '):
903 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
904 break;
905 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
906 # Check that the connection is OK before looping.
907 if not self.oTransport.isConnectionOk():
908 self.oTransport.disconnect();
909 break;
910
911 # Fake response with the reason why we quit.
912 if sFailure is not None:
913 self.t3oReply = (0, 'EXECFAIL', sFailure);
914 rc = None;
915 else:
916 rc = None;
917
918 # Cleanup.
919 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
920 if o is not None and not utils.isString(o):
921 del o.uTxsClientCrc32; # pylint: disable=E1103
922 # Make sure all files are closed
923 o.close(); # pylint: disable=E1103
924 reporter.log('taskExecEx: returns %s' % (rc));
925 return rc;
926
927 #
928 # Admin tasks
929 #
930
931 def hlpRebootShutdownWaitForAck(self, sCmd):
932 """Wait for reboot/shutodwn ACK."""
933 rc = self.recvAckLogged(sCmd);
934 if rc is True:
935 # poll a little while for server to disconnect.
936 uMsStart = base.timestampMilli();
937 while self.oTransport.isConnectionOk() \
938 and base.timestampMilli() - uMsStart >= 5000:
939 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
940 break;
941 self.oTransport.disconnect();
942 return rc;
943
944 def taskReboot(self):
945 rc = self.sendMsg('REBOOT');
946 if rc is True:
947 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
948 return rc;
949
950 def taskShutdown(self):
951 rc = self.sendMsg('SHUTDOWN');
952 if rc is True:
953 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
954 return rc;
955
956 #
957 # CD/DVD control tasks.
958 #
959
960 ## TODO
961
962 #
963 # File system tasks
964 #
965
966 def taskMkDir(self, sRemoteDir, fMode):
967 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
968 if rc is True:
969 rc = self.recvAckLogged('MKDIR');
970 return rc;
971
972 def taskMkDirPath(self, sRemoteDir, fMode):
973 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
974 if rc is True:
975 rc = self.recvAckLogged('MKDRPATH');
976 return rc;
977
978 def taskMkSymlink(self, sLinkTarget, sLink):
979 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
980 if rc is True:
981 rc = self.recvAckLogged('MKSYMLNK');
982 return rc;
983
984 def taskRmDir(self, sRemoteDir):
985 rc = self.sendMsg('RMDIR', (sRemoteDir,));
986 if rc is True:
987 rc = self.recvAckLogged('RMDIR');
988 return rc;
989
990 def taskRmFile(self, sRemoteFile):
991 rc = self.sendMsg('RMFILE', (sRemoteFile,));
992 if rc is True:
993 rc = self.recvAckLogged('RMFILE');
994 return rc;
995
996 def taskRmSymlink(self, sRemoteSymlink):
997 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
998 if rc is True:
999 rc = self.recvAckLogged('RMSYMLNK');
1000 return rc;
1001
1002 def taskRmTree(self, sRemoteTree):
1003 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1004 if rc is True:
1005 rc = self.recvAckLogged('RMTREE');
1006 return rc;
1007
1008 #def "CHMOD "
1009 #def "CHOWN "
1010 #def "CHGRP "
1011
1012 def taskIsDir(self, sRemoteDir):
1013 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1014 if rc is True:
1015 rc = self.recvTrueFalse('ISDIR');
1016 return rc;
1017
1018 def taskIsFile(self, sRemoteFile):
1019 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1020 if rc is True:
1021 rc = self.recvTrueFalse('ISFILE');
1022 return rc;
1023
1024 def taskIsSymlink(self, sRemoteSymlink):
1025 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1026 if rc is True:
1027 rc = self.recvTrueFalse('ISSYMLNK');
1028 return rc;
1029
1030 #def "STAT "
1031 #def "LSTAT "
1032 #def "LIST "
1033
1034 def taskUploadFile(self, sLocalFile, sRemoteFile):
1035 #
1036 # Open the local file (make sure it exist before bothering TXS) and
1037 # tell TXS that we want to upload a file.
1038 #
1039 try:
1040 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1041 except:
1042 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1043 return False;
1044
1045 # Common cause with taskUploadStr
1046 rc = self.taskUploadCommon(oLocalFile, sRemoteFile);
1047
1048 # Cleanup.
1049 oLocalFile.close();
1050 return rc;
1051
1052 def taskUploadString(self, sContent, sRemoteFile):
1053 # Wrap sContent in a file like class.
1054 class InStringFile(object): # pylint: disable=R0903
1055 def __init__(self, sContent):
1056 self.sContent = sContent;
1057 self.off = 0;
1058
1059 def read(self, cbMax):
1060 cbLeft = len(self.sContent) - self.off;
1061 if cbLeft == 0:
1062 return "";
1063 if cbLeft <= cbMax:
1064 sRet = self.sContent[self.off:(self.off + cbLeft)];
1065 else:
1066 sRet = self.sContent[self.off:(self.off + cbMax)];
1067 self.off = self.off + len(sRet);
1068 return sRet;
1069
1070 oLocalString = InStringFile(sContent);
1071 return self.taskUploadCommon(oLocalString, sRemoteFile);
1072
1073 def taskUploadCommon(self, oLocalFile, sRemoteFile):
1074 """Common worker used by taskUploadFile and taskUploadString."""
1075 # Command + ACK.
1076 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1077 if rc is True:
1078 rc = self.recvAckLogged('PUT FILE');
1079 if rc is True:
1080 #
1081 # Push data packets until eof.
1082 #
1083 uMyCrc32 = zlib.crc32("");
1084 while True:
1085 # Read up to 64 KB of data.
1086 try:
1087 sRaw = oLocalFile.read(65536);
1088 except:
1089 rc = None;
1090 break;
1091
1092 # Convert to array - this is silly!
1093 abBuf = array.array('B');
1094 if utils.isString(sRaw):
1095 for i, _ in enumerate(sRaw):
1096 abBuf.append(ord(sRaw[i]));
1097 else:
1098 abBuf.extend(sRaw);
1099 sRaw = None;
1100
1101 # Update the file stream CRC and send it off.
1102 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1103 if not abBuf:
1104 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1105 else:
1106 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1107 if rc is False:
1108 break;
1109
1110 # Wait for the reply.
1111 rc = self.recvAck();
1112 if rc is not True:
1113 if rc is False:
1114 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1115 else:
1116 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1117 rc = False;
1118 break;
1119
1120 # EOF?
1121 if not abBuf:
1122 break;
1123
1124 # Send ABORT on ACK and I/O errors.
1125 if rc is None:
1126 rc = self.sendMsg('ABORT');
1127 if rc is True:
1128 self.recvAckLogged('ABORT');
1129 rc = False;
1130 return rc;
1131
1132 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1133 try:
1134 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1135 except:
1136 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1137 return False;
1138
1139 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1140
1141 oLocalFile.close();
1142 if rc is False:
1143 try:
1144 os.remove(sLocalFile);
1145 except:
1146 reporter.errorXcpt();
1147 return rc;
1148
1149 def taskDownloadString(self, sRemoteFile):
1150 # Wrap sContent in a file like class.
1151 class OutStringFile(object): # pylint: disable=R0903
1152 def __init__(self):
1153 self.asContent = [];
1154
1155 def write(self, sBuf):
1156 self.asContent.append(sBuf);
1157 return None;
1158
1159 oLocalString = OutStringFile();
1160 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1161 if rc is True:
1162 if not oLocalString.asContent:
1163 rc = '';
1164 else:
1165 rc = ''.join(oLocalString.asContent);
1166 return rc;
1167
1168 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1169 """Common worker for taskDownloadFile and taskDownloadString."""
1170 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1171 if rc is True:
1172 #
1173 # Process data packets until eof.
1174 #
1175 uMyCrc32 = zlib.crc32("");
1176 while rc is True:
1177 cbMsg, sOpcode, abPayload = self.recvReply();
1178 if cbMsg is None:
1179 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1180 rc = None;
1181 break;
1182
1183 # Validate.
1184 sOpcode = sOpcode.rstrip();
1185 if sOpcode != 'DATA' and sOpcode != 'DATA EOF':
1186 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1187 % (sOpcode, getSZ(abPayload, 0, "None")));
1188 rc = False;
1189 break;
1190 if sOpcode == 'DATA' and len(abPayload) < 4:
1191 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1192 rc = None;
1193 break;
1194 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1195 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1196 rc = None;
1197 break;
1198
1199 # Check the CRC (common for both packets).
1200 uCrc32 = getU32(abPayload, 0);
1201 if sOpcode == 'DATA':
1202 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1203 if uCrc32 != (uMyCrc32 & 0xffffffff):
1204 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1205 % (hex(uMyCrc32), hex(uCrc32)));
1206 rc = None;
1207 break;
1208 if sOpcode == 'DATA EOF':
1209 rc = self.sendMsg('ACK');
1210 break;
1211
1212 # Finally, push the data to the file.
1213 try:
1214 oLocalFile.write(abPayload[4:].tostring());
1215 except:
1216 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1217 rc = None;
1218 break;
1219 rc = self.sendMsg('ACK');
1220
1221 # Send NACK on validation and I/O errors.
1222 if rc is None:
1223 rc = self.sendMsg('NACK');
1224 rc = False;
1225 return rc;
1226
1227 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1228 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1229 if rc is True:
1230 rc = self.recvAckLogged('UNPKFILE');
1231 return rc;
1232
1233 # pylint: enable=C0111
1234
1235
1236 #
1237 # Public methods - generic task queries
1238 #
1239
1240 def isSuccess(self):
1241 """Returns True if the task completed successfully, otherwise False."""
1242 self.lockTask();
1243 sStatus = self.sStatus;
1244 oTaskRc = self.oTaskRc;
1245 self.unlockTask();
1246 if sStatus != "":
1247 return False;
1248 if oTaskRc is False or oTaskRc is None:
1249 return False;
1250 return True;
1251
1252 def getResult(self):
1253 """
1254 Returns the result of a completed task.
1255 Returns None if not completed yet or no previous task.
1256 """
1257 self.lockTask();
1258 sStatus = self.sStatus;
1259 oTaskRc = self.oTaskRc;
1260 self.unlockTask();
1261 if sStatus != "":
1262 return None;
1263 return oTaskRc;
1264
1265 def getLastReply(self):
1266 """
1267 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1268 Returns a None, None, None three-tuple if there was no last reply.
1269 """
1270 self.lockTask();
1271 t3oReply = self.t3oReply;
1272 self.unlockTask();
1273 return t3oReply;
1274
1275 #
1276 # Public methods - connection.
1277 #
1278
1279 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1280 """
1281 Initiates a disconnect task.
1282
1283 Returns True on success, False on failure (logged).
1284
1285 The task returns True on success and False on failure.
1286 """
1287 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1288
1289 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1290 """Synchronous version."""
1291 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1292
1293 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1294 """
1295 Initiates a task for getting the TXS UUID.
1296
1297 Returns True on success, False on failure (logged).
1298
1299 The task returns UUID string (in {}) on success and False on failure.
1300 """
1301 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskUuid);
1302
1303 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1304 """Synchronous version."""
1305 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1306
1307 #
1308 # Public methods - execution.
1309 #
1310
1311 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1312 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1313 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1314 """
1315 Initiates a exec process task.
1316
1317 Returns True on success, False on failure (logged).
1318
1319 The task returns True if the process exited normally with status code 0.
1320 The task returns None if on failure prior to executing the process, and
1321 False if the process exited with a different status or in an abnormal
1322 manner. Both None and False are logged of course and further info can
1323 also be obtained by getLastReply().
1324
1325 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1326 these streams. If None, no special action is taken and the output goes
1327 to where ever the TXS sends its output, and ditto for input.
1328 - To send to / read from the bitbucket, pass '/dev/null'.
1329 - To redirect to/from a file, just specify the remote filename.
1330 - To append to a file use '>>' followed by the remote filename.
1331 - To pipe the stream to/from the TXS, specify a file like
1332 object. For StdIn a non-blocking read() method is required. For
1333 the other a write() method is required. Watch out for deadlock
1334 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1335 """
1336 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1337 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1338 oStdOut, oStdErr, oTestPipe, sAsUser));
1339
1340 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=R0913
1341 oStdIn = '/dev/null', oStdOut = '/dev/null',
1342 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1343 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1344 """Synchronous version."""
1345 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1346 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1347
1348 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1349 cMsTimeout = 3600000, fIgnoreErrors = False):
1350 """
1351 Initiates a exec process test task.
1352
1353 Returns True on success, False on failure (logged).
1354
1355 The task returns True if the process exited normally with status code 0.
1356 The task returns None if on failure prior to executing the process, and
1357 False if the process exited with a different status or in an abnormal
1358 manner. Both None and False are logged of course and further info can
1359 also be obtained by getLastReply().
1360
1361 Standard in is taken from /dev/null. While both standard output and
1362 standard error goes directly to reporter.log(). The testpipe is piped
1363 to reporter.xxxx.
1364 """
1365
1366 sStdIn = '/dev/null';
1367 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1368 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1369 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1370 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1371
1372 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1373 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1374
1375 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1376 cMsTimeout = 3600000, fIgnoreErrors = False):
1377 """Synchronous version."""
1378 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1379 cMsTimeout, fIgnoreErrors);
1380
1381 #
1382 # Public methods - file system
1383 #
1384
1385 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1386 """
1387 Initiates a reboot task.
1388
1389 Returns True on success, False on failure (logged).
1390
1391 The task returns True on success, False on failure (logged). The
1392 session will be disconnected on successful task completion.
1393 """
1394 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1395
1396 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1397 """Synchronous version."""
1398 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1399
1400 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1401 """
1402 Initiates a shutdown task.
1403
1404 Returns True on success, False on failure (logged).
1405
1406 The task returns True on success, False on failure (logged).
1407 """
1408 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1409
1410 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1411 """Synchronous version."""
1412 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1413
1414
1415 #
1416 # Public methods - file system
1417 #
1418
1419 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1420 """
1421 Initiates a mkdir task.
1422
1423 Returns True on success, False on failure (logged).
1424
1425 The task returns True on success, False on failure (logged).
1426 """
1427 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1428
1429 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1430 """Synchronous version."""
1431 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1432
1433 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1434 """
1435 Initiates a mkdir -p task.
1436
1437 Returns True on success, False on failure (logged).
1438
1439 The task returns True on success, False on failure (logged).
1440 """
1441 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1442
1443 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1444 """Synchronous version."""
1445 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1446
1447 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1448 """
1449 Initiates a symlink task.
1450
1451 Returns True on success, False on failure (logged).
1452
1453 The task returns True on success, False on failure (logged).
1454 """
1455 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1456
1457 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1458 """Synchronous version."""
1459 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1460
1461 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1462 """
1463 Initiates a rmdir task.
1464
1465 Returns True on success, False on failure (logged).
1466
1467 The task returns True on success, False on failure (logged).
1468 """
1469 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1470
1471 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1472 """Synchronous version."""
1473 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1474
1475 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1476 """
1477 Initiates a rmfile task.
1478
1479 Returns True on success, False on failure (logged).
1480
1481 The task returns True on success, False on failure (logged).
1482 """
1483 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1484
1485 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1486 """Synchronous version."""
1487 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1488
1489 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1490 """
1491 Initiates a rmsymlink task.
1492
1493 Returns True on success, False on failure (logged).
1494
1495 The task returns True on success, False on failure (logged).
1496 """
1497 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1498
1499 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1500 """Synchronous version."""
1501 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1502
1503 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1504 """
1505 Initiates a rmtree task.
1506
1507 Returns True on success, False on failure (logged).
1508
1509 The task returns True on success, False on failure (logged).
1510 """
1511 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1512
1513 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1514 """Synchronous version."""
1515 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1516
1517 #def "CHMOD "
1518 #def "CHOWN "
1519 #def "CHGRP "
1520
1521 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1522 """
1523 Initiates a is-dir query task.
1524
1525 Returns True on success, False on failure (logged).
1526
1527 The task returns True if it's a directory, False if it isn't, and
1528 None on error (logged).
1529 """
1530 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1531
1532 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1533 """Synchronous version."""
1534 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1535
1536 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1537 """
1538 Initiates a is-file query task.
1539
1540 Returns True on success, False on failure (logged).
1541
1542 The task returns True if it's a file, False if it isn't, and None on
1543 error (logged).
1544 """
1545 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1546
1547 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1548 """Synchronous version."""
1549 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1550
1551 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1552 """
1553 Initiates a is-symbolic-link query task.
1554
1555 Returns True on success, False on failure (logged).
1556
1557 The task returns True if it's a symbolic linke, False if it isn't, and
1558 None on error (logged).
1559 """
1560 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1561
1562 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1563 """Synchronous version."""
1564 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1565
1566 #def "STAT "
1567 #def "LSTAT "
1568 #def "LIST "
1569
1570 def asyncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1571 """
1572 Initiates a download query task.
1573
1574 Returns True on success, False on failure (logged).
1575
1576 The task returns True on success, False on failure (logged).
1577 """
1578 return self.startTask(cMsTimeout, fIgnoreErrors, "upload", self.taskUploadFile, (sLocalFile, sRemoteFile));
1579
1580 def syncUploadFile(self, sLocalFile, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1581 """Synchronous version."""
1582 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1583
1584 def asyncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1585 """
1586 Initiates a upload string task.
1587
1588 Returns True on success, False on failure (logged).
1589
1590 The task returns True on success, False on failure (logged).
1591 """
1592 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString", self.taskUploadString, (sContent, sRemoteFile));
1593
1594 def syncUploadString(self, sContent, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1595 """Synchronous version."""
1596 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, cMsTimeout, fIgnoreErrors);
1597
1598 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1599 """
1600 Initiates a download file task.
1601
1602 Returns True on success, False on failure (logged).
1603
1604 The task returns True on success, False on failure (logged).
1605 """
1606 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1607
1608 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 30000, fIgnoreErrors = False):
1609 """Synchronous version."""
1610 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1611
1612 def asyncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1613 """
1614 Initiates a download string task.
1615
1616 Returns True on success, False on failure (logged).
1617
1618 The task returns a byte string on success, False on failure (logged).
1619 """
1620 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString", self.taskDownloadString, (sRemoteFile,));
1621
1622 def syncDownloadString(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1623 """Synchronous version."""
1624 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, cMsTimeout, fIgnoreErrors);
1625
1626 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1627 """
1628 Initiates a unpack file task.
1629
1630 Returns True on success, False on failure (logged).
1631
1632 The task returns True on success, False on failure (logged).
1633 """
1634 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile, \
1635 (sRemoteFile, sRemoteDir));
1636
1637 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1638 """Synchronous version."""
1639 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1640
1641
1642class TransportTcp(TransportBase):
1643 """
1644 TCP transport layer for the TXS client session class.
1645 """
1646
1647 def __init__(self, sHostname, uPort, fReversedSetup):
1648 """
1649 Save the parameters. The session will call us back to make the
1650 connection later on its worker thread.
1651 """
1652 TransportBase.__init__(self, utils.getCallerName());
1653 self.sHostname = sHostname;
1654 self.fReversedSetup = fReversedSetup;
1655 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1656 self.oSocket = None;
1657 self.oWakeupW = None;
1658 self.oWakeupR = None;
1659 self.fConnectCanceled = False;
1660 self.fIsConnecting = False;
1661 self.oCv = threading.Condition();
1662 self.abReadAhead = array.array('B');
1663
1664 def toString(self):
1665 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1666 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1667 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1668 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1669
1670 def __isInProgressXcpt(self, oXcpt):
1671 """ In progress exception? """
1672 reporter.log("oXcpt=%s" % (oXcpt)) ## TMP TMP
1673 reporter.log("dir(oXcpt)=%s" % (dir(oXcpt))) ## TMP TMP
1674 try:
1675 if isinstance(oXcpt, socket.error):
1676 try:
1677 if oXcpt.errno == errno.EINPROGRESS:
1678 return True;
1679 except: pass;
1680 # Windows?
1681 try:
1682 if oXcpt.errno == errno.EWOULDBLOCK:
1683 return True;
1684 except: pass;
1685 except:
1686 pass;
1687 return False;
1688
1689 def __isWouldBlockXcpt(self, oXcpt):
1690 """ Would block exception? """
1691 try:
1692 if isinstance(oXcpt, socket.error):
1693 try:
1694 if oXcpt.errno == errno.EWOULDBLOCK:
1695 return True;
1696 except: pass;
1697 try:
1698 if oXcpt.errno == errno.EAGAIN:
1699 return True;
1700 except: pass;
1701 except:
1702 pass;
1703 return False;
1704
1705 def __isConnectionReset(self, oXcpt):
1706 """ Connection reset by Peer or others. """
1707 try:
1708 if isinstance(oXcpt, socket.error):
1709 try:
1710 if oXcpt.errno == errno.ECONNRESET:
1711 return True;
1712 except: pass;
1713 try:
1714 if oXcpt.errno == errno.ENETRESET:
1715 return True;
1716 except: pass;
1717 except:
1718 pass;
1719 return False;
1720
1721 def _closeWakeupSockets(self):
1722 """ Closes the wakup sockets. Caller should own the CV. """
1723 oWakeupR = self.oWakeupR;
1724 self.oWakeupR = None;
1725 if oWakeupR is not None:
1726 oWakeupR.close();
1727
1728 oWakeupW = self.oWakeupW;
1729 self.oWakeupW = None;
1730 if oWakeupW is not None:
1731 oWakeupW.close();
1732
1733 return None;
1734
1735 def cancelConnect(self):
1736 # This is bad stuff.
1737 self.oCv.acquire();
1738 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1739 self.fConnectCanceled = True;
1740 if self.fIsConnecting:
1741 oSocket = self.oSocket;
1742 self.oSocket = None;
1743 if oSocket is not None:
1744 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1745 oSocket.close();
1746
1747 oWakeupW = self.oWakeupW;
1748 self.oWakeupW = None;
1749 if oWakeupW is not None:
1750 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1751 try: oWakeupW.send('cancelled!\n');
1752 except: reporter.logXcpt();
1753 try: oWakeupW.shutdown(socket.SHUT_WR);
1754 except: reporter.logXcpt();
1755 oWakeupW.close();
1756 self.oCv.release();
1757
1758 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
1759 """ Connects to the TXS server as server, i.e. the reversed setup. """
1760 assert(self.fReversedSetup);
1761
1762 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
1763
1764 # Workaround for bind() failure...
1765 try:
1766 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
1767 except:
1768 reporter.errorXcpt('socket.listen(1) failed');
1769 return None;
1770
1771 # Bind the socket and make it listen.
1772 try:
1773 oSocket.bind((self.sHostname, self.uPort));
1774 except:
1775 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
1776 return None;
1777 try:
1778 oSocket.listen(1);
1779 except:
1780 reporter.errorXcpt('socket.listen(1) failed');
1781 return None;
1782
1783 # Accept connections.
1784 oClientSocket = None;
1785 tClientAddr = None;
1786 try:
1787 (oClientSocket, tClientAddr) = oSocket.accept();
1788 except socket.error as e:
1789 if not self.__isInProgressXcpt(e):
1790 raise;
1791
1792 # Do the actual waiting.
1793 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
1794 try:
1795 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1796 except socket.error as e:
1797 if e[0] != errno.EBADF or not self.fConnectCanceled:
1798 raise;
1799 reporter.log('socket.select() on accept was canceled');
1800 return None;
1801 except:
1802 reporter.logXcpt('socket.select() on accept');
1803
1804 # Try accept again.
1805 try:
1806 (oClientSocket, tClientAddr) = oSocket.accept();
1807 except socket.error as e:
1808 if not self.__isInProgressXcpt(e):
1809 if e[0] != errno.EBADF or not self.fConnectCanceled:
1810 raise;
1811 reporter.log('socket.accept() was canceled');
1812 return None;
1813 reporter.log('socket.accept() timed out');
1814 return False;
1815 except:
1816 reporter.errorXcpt('socket.accept() failed');
1817 return None;
1818 except:
1819 reporter.errorXcpt('socket.accept() failed');
1820 return None;
1821
1822 # Store the connected socket and throw away the server socket.
1823 self.oCv.acquire();
1824 if not self.fConnectCanceled:
1825 self.oSocket.close();
1826 self.oSocket = oClientSocket;
1827 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
1828 self.oCv.release();
1829 return True;
1830
1831 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
1832 """ Connects to the TXS server as client. """
1833 assert(not self.fReversedSetup);
1834
1835 # Connect w/ timeouts.
1836 rc = None;
1837 try:
1838 oSocket.connect((self.sHostname, self.uPort));
1839 rc = True;
1840 except socket.error as e:
1841 iRc = e[0];
1842 if self.__isInProgressXcpt(e):
1843 # Do the actual waiting.
1844 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (e,));
1845 try:
1846 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
1847 if len(ttRc[1]) + len(ttRc[2]) == 0:
1848 raise socket.error(errno.ETIMEDOUT, 'select timed out');
1849 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
1850 rc = iRc == 0;
1851 except socket.error as e:
1852 iRc = e[0];
1853 except:
1854 iRc = -42;
1855 reporter.fatalXcpt('socket.select() on connect failed');
1856
1857 if rc is True:
1858 pass;
1859 elif iRc == errno.ECONNREFUSED \
1860 or iRc == errno.EHOSTUNREACH \
1861 or iRc == errno.EINTR \
1862 or iRc == errno.ENETDOWN \
1863 or iRc == errno.ENETUNREACH \
1864 or iRc == errno.ETIMEDOUT:
1865 rc = False; # try again.
1866 else:
1867 if iRc != errno.EBADF or not self.fConnectCanceled:
1868 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
1869 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
1870 except:
1871 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
1872 return rc;
1873
1874
1875 def connect(self, cMsTimeout):
1876 # Create a non-blocking socket.
1877 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
1878 try:
1879 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
1880 except:
1881 reporter.fatalXcpt('socket.socket() failed');
1882 return None;
1883 try:
1884 oSocket.setblocking(0);
1885 except:
1886 oSocket.close();
1887 reporter.fatalXcpt('socket.socket() failed');
1888 return None;
1889
1890 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
1891 oWakeupR = None;
1892 oWakeupW = None;
1893 if hasattr(socket, 'socketpair'):
1894 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=E1101
1895 except: reporter.logXcpt('socket.socketpair() failed');
1896
1897 # Update the state.
1898 self.oCv.acquire();
1899 rc = None;
1900 if not self.fConnectCanceled:
1901 self.oSocket = oSocket;
1902 self.oWakeupW = oWakeupW;
1903 self.oWakeupR = oWakeupR;
1904 self.fIsConnecting = True;
1905 self.oCv.release();
1906
1907 # Try connect.
1908 if oWakeupR is None:
1909 oWakeupR = oSocket; # Avoid select failure.
1910 if self.fReversedSetup:
1911 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
1912 else:
1913 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
1914 oSocket = None;
1915
1916 # Update the state and cleanup on failure/cancel.
1917 self.oCv.acquire();
1918 if rc is True and self.fConnectCanceled:
1919 rc = False;
1920 self.fIsConnecting = False;
1921
1922 if rc is not True:
1923 if self.oSocket is not None:
1924 self.oSocket.close();
1925 self.oSocket = None;
1926 self._closeWakeupSockets();
1927 self.oCv.release();
1928
1929 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
1930 return rc;
1931
1932 def disconnect(self, fQuiet = False):
1933 if self.oSocket is not None:
1934 self.abReadAhead = array.array('B');
1935
1936 # Try a shutting down the socket gracefully (draining it).
1937 try:
1938 self.oSocket.shutdown(socket.SHUT_WR);
1939 except:
1940 if not fQuiet:
1941 reporter.error('shutdown(SHUT_WR)');
1942 try:
1943 self.oSocket.setblocking(0); # just in case it's not set.
1944 sData = "1";
1945 while sData:
1946 sData = self.oSocket.recv(16384);
1947 except:
1948 pass;
1949
1950 # Close it.
1951 self.oCv.acquire();
1952 try: self.oSocket.setblocking(1);
1953 except: pass;
1954 self.oSocket.close();
1955 self.oSocket = None;
1956 else:
1957 self.oCv.acquire();
1958 self._closeWakeupSockets();
1959 self.oCv.release();
1960
1961 def sendBytes(self, abBuf, cMsTimeout):
1962 if self.oSocket is None:
1963 reporter.error('TransportTcp.sendBytes: No connection.');
1964 return False;
1965
1966 # Try send it all.
1967 try:
1968 cbSent = self.oSocket.send(abBuf);
1969 if cbSent == len(abBuf):
1970 return True;
1971 except Exception as oXcpt:
1972 if not self.__isWouldBlockXcpt(oXcpt):
1973 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
1974 return False;
1975 cbSent = 0;
1976
1977 # Do a timed send.
1978 msStart = base.timestampMilli();
1979 while True:
1980 cMsElapsed = base.timestampMilli() - msStart;
1981 if cMsElapsed > cMsTimeout:
1982 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
1983 break;
1984
1985 # wait.
1986 try:
1987 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
1988 if ttRc[2] and not ttRc[1]:
1989 reporter.error('TranportTcp.sendBytes: select returned with exception');
1990 break;
1991 if not ttRc[1]:
1992 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
1993 break;
1994 except:
1995 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
1996 break;
1997
1998 # Try send more.
1999 try:
2000 cbSent += self.oSocket.send(abBuf[cbSent:]);
2001 if cbSent == len(abBuf):
2002 return True;
2003 except Exception as oXcpt:
2004 if not self.__isWouldBlockXcpt(oXcpt):
2005 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2006 break;
2007
2008 return False;
2009
2010 def __returnReadAheadBytes(self, cb):
2011 """ Internal worker for recvBytes. """
2012 assert(len(self.abReadAhead) >= cb);
2013 abRet = self.abReadAhead[:cb];
2014 self.abReadAhead = self.abReadAhead[cb:];
2015 return abRet;
2016
2017 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2018 if self.oSocket is None:
2019 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2020 return None;
2021
2022 # Try read in some more data without bothering with timeout handling first.
2023 if len(self.abReadAhead) < cb:
2024 try:
2025 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2026 if abBuf:
2027 self.abReadAhead.extend(array.array('B', abBuf));
2028 except Exception as oXcpt:
2029 if not self.__isWouldBlockXcpt(oXcpt):
2030 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2031 return None;
2032
2033 if len(self.abReadAhead) >= cb:
2034 return self.__returnReadAheadBytes(cb);
2035
2036 # Timeout loop.
2037 msStart = base.timestampMilli();
2038 while True:
2039 cMsElapsed = base.timestampMilli() - msStart;
2040 if cMsElapsed > cMsTimeout:
2041 if not fNoDataOk or self.abReadAhead:
2042 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2043 break;
2044
2045 # Wait.
2046 try:
2047 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2048 if ttRc[2] and not ttRc[0]:
2049 reporter.error('TranportTcp.recvBytes: select returned with exception');
2050 break;
2051 if not ttRc[0]:
2052 if not fNoDataOk or self.abReadAhead:
2053 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2054 % (len(self.abReadAhead), cb, fNoDataOk));
2055 break;
2056 except:
2057 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2058 break;
2059
2060 # Try read more.
2061 try:
2062 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2063 if not abBuf:
2064 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2065 % (len(self.abReadAhead), cb, fNoDataOk));
2066 self.disconnect();
2067 return None;
2068
2069 self.abReadAhead.extend(array.array('B', abBuf));
2070
2071 except Exception as oXcpt:
2072 reporter.log('recv => exception %s' % (oXcpt,));
2073 if not self.__isWouldBlockXcpt(oXcpt):
2074 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2075 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2076 break;
2077
2078 # Done?
2079 if len(self.abReadAhead) >= cb:
2080 return self.__returnReadAheadBytes(cb);
2081
2082 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2083 return None;
2084
2085 def isConnectionOk(self):
2086 if self.oSocket is None:
2087 return False;
2088 try:
2089 ttRc = select.select([], [], [self.oSocket], 0.0);
2090 if ttRc[2]:
2091 return False;
2092
2093 self.oSocket.send(array.array('B')); # send zero bytes.
2094 except:
2095 return False;
2096 return True;
2097
2098 def isRecvPending(self, cMsTimeout = 0):
2099 try:
2100 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2101 if not ttRc[0]:
2102 return False;
2103 except:
2104 pass;
2105 return True;
2106
2107
2108def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2109 """
2110 Opens a connection to a Test Execution Service via TCP, given its name.
2111 """
2112 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' % \
2113 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2114 try:
2115 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2116 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge);
2117 except:
2118 reporter.errorXcpt(None, 15);
2119 return None;
2120 return oSession;
2121
2122
2123def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0):
2124 """
2125 Tries to open a connection to a Test Execution Service via TCP, given its name.
2126
2127 This differs from openTcpSession in that it won't log a connection failure
2128 as an error.
2129 """
2130 try:
2131 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2132 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True);
2133 except:
2134 reporter.errorXcpt(None, 15);
2135 return None;
2136 return oSession;
2137
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette