# Twisted, the Framework of Your Internet # Copyright (C) 2001 Matthew W. Lefkowitz # # This library is free software; you can redistribute it and/or # modify it under the terms of version 2.1 of the GNU Lesser General Public # License as published by the Free Software Foundation. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """A win32event based implementation of the Twisted main loop. This requires win32all or ActivePython to be installed. API Stability: semi-stable Maintainer: U{Itamar Shtull-Trauring} LIMITATIONS: 1. WaitForMultipleObjects and thus the event loop can only handle 64 objects. 2. Process running has some problems (see Process docstring). TODO: 1. Event loop handling of writes is *very* problematic (this is causing failed tests). Switch to doing it the correct way, whatever that means (see below). 2. Replace icky socket loopback waker with event based waker (use dummyEvent object) 3. Switch everyone to using Free Software so we don't have to deal with proprietary APIs. ALTERNATIVE SOLUTIONS: - IIRC, sockets can only be registered once. So we switch to a structure like the poll() reactor, thus allowing us to deal with write events in a decent fashion. This should allow us to pass tests, but we're still limited to 64 events. Or: - Instead of doing a reactor, we make this an addon to the default reactor. The WFMO event loop runs in a separate thread. This means no need to maintain separate code for networking, 64 event limit doesn't apply to sockets, we can run processes and other win32 stuff in default event loop. The only problem is that we're stuck with the icky socket based waker. Another benefit is that this could be extended to support >64 events in a simpler manner than the previous solution. The 2nd solution is probably what will get implemented. """ # Win32 imports from win32file import WSAEventSelect, FD_READ, FD_WRITE, FD_CLOSE, \ FD_ACCEPT, FD_CONNECT from win32event import CreateEvent, MsgWaitForMultipleObjects, \ WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE, QS_ALLINPUT, QS_ALLEVENTS import win32api import win32con import win32event import win32file import win32pipe import win32process import win32security import pywintypes import msvcrt import win32gui # Twisted imports from twisted.internet import abstract, default, main, error from twisted.python import log, threadable, failure from twisted.internet.interfaces import IReactorFDSet # System imports import os import threading import Queue import string import time import sys # globals reads = {} writes = {} events = {} class Win32Reactor(default.PosixReactorBase): """Reactor that uses Win32 event APIs.""" __implements__ = (default.PosixReactorBase.__implements__, IReactorFDSet) dummyEvent = CreateEvent(None, 0, 0, None) def _makeSocketEvent(self, fd, action, why, events=events): """Make a win32 event object for a socket.""" event = CreateEvent(None, 0, 0, None) WSAEventSelect(fd, event, why) events[event] = (fd, action) return event def addEvent(self, event, fd, action, events=events): """Add a new win32 event to the event loop.""" events[event] = (fd, action) def removeEvent(self, event): """Remove an event.""" del events[event] def addReader(self, reader, reads=reads): """Add a socket FileDescriptor for notification of data available to read. """ if not reads.has_key(reader): reads[reader] = self._makeSocketEvent(reader, reader.doRead, FD_READ|FD_ACCEPT|FD_CONNECT|FD_CLOSE) def addWriter(self, writer, writes=writes): """Add a socket FileDescriptor for notification of data available to write. """ if not writes.has_key(writer): writes[writer] = 1 def removeReader(self, reader): """Remove a Selectable for notification of data available to read. """ if reads.has_key(reader): del events[reads[reader]] del reads[reader] def removeWriter(self, writer, writes=writes): """Remove a Selectable for notification of data available to write. """ if writes.has_key(writer): del writes[writer] def removeAll(self): """Remove all selectables, and return a list of them.""" result = reads.keys() + writes.keys() reads.clear() writes.clear() events.clear() return result def doWaitForMultipleEvents(self, timeout, reads=reads, writes=writes): if timeout is None: #timeout = INFINITE timeout = 5000 else: timeout = int(timeout * 1000) if not (events or writes): # sleep so we don't suck up CPU time time.sleep(timeout / 1000.0) return canDoMoreWrites = 0 for fd in writes.keys(): log.logOwner.own(fd) closed = 0 try: closed = fd.doWrite() except: closed = sys.exc_value log.deferr() if closed: self.removeReader(fd) self.removeWriter(fd) try: fd.connectionLost(failure.Failure(closed)) except: log.deferr() elif closed is None: canDoMoreWrites = 1 log.logOwner.disown(fd) if canDoMoreWrites: timeout = 0 handles = events.keys() or [self.dummyEvent] val = MsgWaitForMultipleObjects(handles, 0, timeout, QS_ALLINPUT | QS_ALLEVENTS) if val == WAIT_TIMEOUT: return elif val == WAIT_OBJECT_0 + len(handles): exit = win32gui.PumpWaitingMessages() if exit: self.callLater(0, self.stop) return elif val >= WAIT_OBJECT_0 and val < WAIT_OBJECT_0 + len(handles): fd, action = events[handles[val - WAIT_OBJECT_0]] closed = 0 log.logOwner.own(fd) try: closed = action() except: closed = sys.exc_value log.deferr() if closed: self.removeReader(fd) self.removeWriter(fd) try: fd.connectionLost(failure.Failure(closed)) except: log.deferr() log.logOwner.disown(fd) doIteration = doWaitForMultipleEvents def spawnProcess(self, processProtocol, executable, args=(), env={}, path=None): """Spawn a process.""" Process(self, processProtocol, executable, args, env, path) def install(): threadable.init(1) r = Win32Reactor() import main main.installReactor(r) class Process(abstract.FileDescriptor): """A process that integrates with the Twisted event loop. Issues: - stdin close is actually signalled by process shutdown, which is wrong. Solution is to register stdin pipe with event loop and check for the correct event type - this needs to be implemented. If your subprocess is a python program, you need to: - Run python.exe with the '-u' command line option - this turns on unbuffered I/O. Buffering stdout/err/in can cause problems, see e.g. http://support.microsoft.com/default.aspx?scid=kb;EN-US;q1903 - If you don't want Windows messing with data passed over stdin/out/err, set the pipes to be in binary mode:: import os, sys, mscvrt msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY) msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) """ buffer = '' def __init__(self, reactor, protocol, command, args, environment, path): self.reactor = reactor self.protocol = protocol # security attributes for pipes sAttrs = win32security.SECURITY_ATTRIBUTES() sAttrs.bInheritHandle = 1 # create the pipes which will connect to the secondary process self.hStdoutR, hStdoutW = win32pipe.CreatePipe(sAttrs, 0) self.hStderrR, hStderrW = win32pipe.CreatePipe(sAttrs, 0) hStdinR, self.hStdinW = win32pipe.CreatePipe(sAttrs, 0) # set the info structure for the new process. StartupInfo = win32process.STARTUPINFO() StartupInfo.hStdOutput = hStdoutW StartupInfo.hStdError = hStderrW StartupInfo.hStdInput = hStdinR StartupInfo.dwFlags = win32process.STARTF_USESTDHANDLES # Create new handles whose inheritance property is false pid = win32api.GetCurrentProcess() tmp = win32api.DuplicateHandle(pid, self.hStdoutR, pid, 0, 0, win32con.DUPLICATE_SAME_ACCESS) win32file.CloseHandle(self.hStdoutR) self.hStdoutR = tmp tmp = win32api.DuplicateHandle(pid, self.hStderrR, pid, 0, 0, win32con.DUPLICATE_SAME_ACCESS) win32file.CloseHandle(self.hStderrR) self.hStderrR = tmp tmp = win32api.DuplicateHandle(pid, self.hStdinW, pid, 0, 0, win32con.DUPLICATE_SAME_ACCESS) win32file.CloseHandle(self.hStdinW) self.hStdinW = tmp # create the process cmdline = "%s %s" % (command, string.join(args[1:], ' ')) self.hProcess, hThread, dwPid, dwTid = win32process.CreateProcess(None, cmdline, None, None, 1, 0, environment, path, StartupInfo) # close handles which only the child will use win32file.CloseHandle(hStderrW) win32file.CloseHandle(hStdoutW) win32file.CloseHandle(hStdinR) self.outQueue = Queue.Queue() self.closed = 0 self.closedNotifies = 0 # notify protocol self.protocol.makeConnection(self) self.reactor.addEvent(self.hProcess, self, self.inConnectionLost) threading.Thread(target=self.doWrite).start() threading.Thread(target=self.doReadOut).start() threading.Thread(target=self.doReadErr).start() def write(self, data): """Write data to the process' stdin.""" self.outQueue.put(data) def closeStdin(self): """Close the process' stdin.""" self.outQueue.put(None) def closeStderr(self): if hasattr(self, "hStderrR"): win32file.CloseHandle(self.hStderrR) del self.hStderrR def closeStdout(self): if hasattr(self, "hStdoutR"): win32file.CloseHandle(self.hStdoutR) del self.hStdoutR def loseConnection(self): """Close the process' stdout, in and err.""" self.closeStdin() self.closeStdout() self.closeStderr() def outConnectionLost(self): self.closeStdout() # in case process closed it, not us self.protocol.outConnectionLost() self.connectionLostNotify() def errConnectionLost(self): self.closeStderr() # in case processed closed it self.protocol.errConnectionLost() self.connectionLostNotify() def _closeStdin(self): if hasattr(self, "hStdinW"): win32file.CloseHandle(self.hStdinW) del self.hStdinW self.outQueue.put(None) def inConnectionLost(self): self._closeStdin() self.protocol.inConnectionLost() self.connectionLostNotify() def connectionLostNotify(self): """Will be called 3 times, by stdout/err threads and process handle.""" self.closedNotifies = self.closedNotifies + 1 if self.closedNotifies == 3: self.closed = 1 self.connectionLost() def connectionLost(self, reason=None): """Shut down resources.""" exitCode = win32process.GetExitCodeProcess(self.hProcess) self.reactor.removeEvent(self.hProcess) abstract.FileDescriptor.connectionLost(self, reason) if exitCode == 0: err = error.ProcessDone() else: err = error.ProcessTerminated(exitCode) self.protocol.processEnded(failure.Failure(err)) def doWrite(self): """Runs in thread.""" while 1: data = self.outQueue.get() if data == None: break try: win32file.WriteFile(self.hStdinW, data, None) except win32api.error: break self._closeStdin() def doReadOut(self): """Runs in thread.""" while 1: try: finished = 0 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.hStdoutR, 1) finished = (result == -1) and not bytesToRead if bytesToRead == 0 and result != -1: bytesToRead = 1 hr, data = win32file.ReadFile(self.hStdoutR, bytesToRead, None) except win32api.error: finished = 1 else: self.reactor.callFromThread(self.protocol.outReceived, data) if finished: self.reactor.callFromThread(self.outConnectionLost) return def doReadErr(self): """Runs in thread.""" while 1: try: finished = 0 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.hStderrR, 1) finished = (result == -1) and not bytesToRead if bytesToRead == 0 and result != -1: bytesToRead = 1 hr, data = win32file.ReadFile(self.hStderrR, bytesToRead, None) except win32api.error: finished = 1 else: self.reactor.callFromThread(self.protocol.errReceived, data) if finished: self.reactor.callFromThread(self.errConnectionLost) return __all__ = ["Win32Reactor", "install"]