## @package SocketClient # Extended Socket with handling of a message Queue and notifications to registered listeners. # # ----------------------------------------------------------------------------------- # # Copyright (c) 2009 Neobotix (www.neobotix.de) # # This software is allowed to be used and modified only in association with a Neobotix # robot platform. It is allowed to include the software into applications and # to distribute it with a Neobotix robot platform. # # This software is provided WITHOUT ANY WARRANTY; without even the # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. See the Neobotix License (Version 1.0) for more details. # # You should have received a copy of the Neobotix License # along with this software; if not, write to the # Gesellschaft fuer Produktionssysteme, Neobotix, Nobelstrasse 12, 70569 Stuttgart, Germany # # ----------------------------------------------------------------------------------- from Neobotix.Bases.ByteArray import ByteArray from Message import Message from Queue import Queue, Full, Empty from threading import Thread import socket import time class SocketClient(Thread, socket.socket): ''' Socket Handling with Send and Receive Queue. ''' __sendQueueMaxSize = 10 NEO_MSG_HEADER = ByteArray([255, 1, 254, 2]) def __init__(self): """ Constructor """ #Init socket parent class socket.socket.__init__(self, socket.AF_INET, socket.SOCK_STREAM) Thread.__init__(self, None, None, 'SocketClientThread') self.setDaemon(True) #Init queues self.sendQueue = Queue(self.__sendQueueMaxSize) #Listeners self.__connectionListeners = [] self.__connected = False def addToSendQueue(self, msg): if self.__connected: try: self.sendQueue.put_nowait(msg) return True except Full: print "[SocketClient] Send Queue Overflow" return False else: return False def connect(self, address, port): self.__address = address self.__port = port self.start() def isConnected(self): return self.__connected def run(self): for i in range(2): try: # print "[SocketClient] Connecting..." socket.socket.connect(self, (self.__address, self.__port)) self.__connected = True # print '[SocketClient] Success ('+str(self)+')' #Notify for listener in self.__connectionListeners: listener.onConnect() break except Exception: self.__connected = False # print "[SocketClient] Failed" time.sleep(1) while self.isConnected(): try: if not self.sendQueue.empty() : msg = self.sendQueue.get_nowait() #Building a telegram with Neo format telegram = ByteArray() #Header telegram.extend(self.NEO_MSG_HEADER) #Size size = len(msg) telegram.writeInt(size) #Content telegram.extend(msg) self.send(telegram) # print "[SocketClient] Sent "+str(msg.getMessageType()) if msg.getHasReply() : #Start receiving sHeader = "" while (len(sHeader) < 8): sHeader += self.recv(8) header = ByteArray(sHeader) headerCheck = [None]*4 headerCheck[0] = header.readByte() headerCheck[1] = header.readByte() headerCheck[2] = header.readByte() headerCheck[3] = header.readByte() size = header.readInt() valid = True for i in range(len(self.NEO_MSG_HEADER)): if not headerCheck[i] == self.NEO_MSG_HEADER[i]: valid = False sData = "" while (len(sData) < size): sData += self.recv(size) data = ByteArray(sData) if valid: msg = Message() msg.extend(data) msg.setMessageType(data[0]) for listener in self.__connectionListeners: listener.onReceive(msg) else : print "[SocketClient] Wrong Header" #Yield time.sleep(0.05) except socket.error: self.__connected = False # print"[SocketClient] Disconnected" self.close() #Notify for listener in self.__connectionListeners: listener.onDisconnect() def addConnectionListener(self, listener): self.__connectionListeners.append(listener) def removeConnectionListener(self, listener): self.__connectionListeners.remove(listener)