'''
Terms and conditions
--------------------
Copywrite 2009 Jim Nilsson.
PyCAN is distributed under the terms of the GNU Lesser General Public License.
This file is part of the PyCAN Libary.
PyCAN is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
PyCAN is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU General Public License and
GNU Lesser General Public License along with PyCAN.
If not, see <http://www.gnu.org/licenses/>.
--------------------------------------------------------------------------------------------------
'''
#IMPORTS
import re
import platform
import time
import ctypes
import threading
from drivers.can_vector_driver import XlDriver, s_xl_event
#MODULE CONSTANTS
XL_INVALID_PORTHANDLE = -1
XL_INTERFACE_VERSION = 3
XL_ACTIVATE_RESET_CLOCK = 8
XL_NO_INIT_ACCESS = 0
XL_SUCCESS = 0
XL_ERR_QUEUE_IS_EMPTY = 10
XL_NO_COMMAND = 0
XL_RECEIVE_MSG = 1
XL_CHIP_STATE = 4
XL_TRANSCEIVER = 6
XL_TIMER = 8
XL_TRANSMIT_MSG = 10
XL_SYNC_PULSE = 11
XL_APPLICATION_NOTIFICATION = 15
DLC = 8
SIZE_MSG_BUFFER = 256
XLuint64 = ctypes.c_longlong
XLaccess = XLuint64
XLportHandle = ctypes.c_long
XLeventTag = ctypes.c_ubyte
XLevent = s_xl_event
XLeventList = SIZE_MSG_BUFFER*XLevent
[docs]class CanHwChannel():
"""
**Description this is the generall hardware CAN channel class. It contains all information about the used hardware
to be able to open a channel for commuication over CAN.
"""
def __init__(self):
self.__create()
def __create(self):
self.__hardware = None
self.__dllVersion = None
self.__numberOfChannels = None
self.__channelName = None
self.__hardwareType = None
self.__hardwareIndex = None
self.__hardwareChannel = None
self.__transceiverType = None
self.__transceiverState = None
self.__channelIndex = None
self.__channelMask = None
self.__isOnBus = None
self.__connectedBusType = None
self.__hardwareSerialNumber = None
self.__hardwareArticleNumber = None
self.__transceiverName = None
self.__portHandle = None
self.__accessMask = None
def __setHardware(self, value):
self.__hardware = value
def __getHardware(self):
return self.__hardware
def __setDllversion(self, value):
self.__dllVersion = value
def __getDllversion(self):
return self.__dllVersion
def __setNumberOfChannels(self, value):
self.__numberOfChannels = value
def __getNumberOfChannels(self):
return self.__numberOfChannels
def __setChannelName(self, value):
self.__channelName = value
def __getChannelName(self):
return self.__channelName
def __setHardwareType(self, value):
self.__hardwareType(value)
def __getHardwareType(self):
return self.__hardwareType
def __setHardwareIndex(self, value):
self.__hardwareIndex(value)
def __getHardwareIndex(self):
return self.__hardwareIndex
def __setHardwareChannel(self, value):
self.__hardwareChannel(value)
def __getHardwareChannel(self):
return self.__hardwareChannel
def __setTransceiverType(self, value):
self.__transceiverType(value)
def __getTransceiverType(self):
return self.__transceiverType
def __setTransceiverState(self, value):
self.__transceiverState(value)
def __getTransceiverState(self):
return self.__transceiverState
def __setChannelIndex(self, value):
self.__channelIndex(value)
def __getChannelIndex(self):
return self.__channelIndex
def __setChannelMask(self, value):
self.__channelMask(value)
def __getChannelMask(self):
return self.__channelMask
def __setIsOnBus(self, value):
self.__isOnBus(value)
def __getIsOnBus(self):
return self.__isOnBus
def __getConnectedBusType(self, value):
self.__connectedBusType = value
def __setConnectedBusType(self):
return self.__connectedBusType
def __setHardwareSerialNumber(self, value):
self.__hardwareSerialNumber(value)
def __getHardwareSerialNumber(self):
return self.__hardwareSerialNumber
def __setHardwareArticleNumber(self, value):
self.__hardwareArticleNumber(value)
def __getHardwareArticleNumber(self):
return self.__hardwareArticleNumber
def __setTransceiverName(self, value):
self.__transceiverName(value)
def __getTransceiverName(self):
return self.__transceiverName
def __setPortHandel(self, value):
self.__portHandle = value
def __getPortHandel(self):
return self.__portHandle
def __setAccessMask(self, value):
self.__accessMask = value
def __getAccessMask(self):
return self.__accessMask
hardware = property(__getHardware, __setHardware)
dllVersion = property(__getDllversion, __setDllversion)
numberOfChannels = property(__getNumberOfChannels, __setNumberOfChannels)
channelName = property(__getChannelName, __setChannelName)
hardwareType = property(__getHardwareType, __setHardwareType)
hardwareIndex = property(__getHardwareIndex, __setHardwareIndex)
hardwareChannel = property(__getHardwareChannel, __setHardwareChannel)
transceiverType = property(__getTransceiverType, __setTransceiverType)
transceiverState = property(__getTransceiverState, __setTransceiverState)
channelIndex = property(__getChannelIndex, __setChannelIndex)
channelMask = property(__getChannelMask, __setChannelMask)
isOnBus = property(__getIsOnBus, __setIsOnBus)
connectedBusType = property(__getConnectedBusType, __setConnectedBusType)
hardwareSerialNumber = property(__getHardwareSerialNumber, __setHardwareSerialNumber)
hardwareArticleNumber = property(__getHardwareArticleNumber, __setHardwareArticleNumber)
transceiverName = property(__getTransceiverName, __setTransceiverName)
portHandel = property(__getPortHandel, __setPortHandel)
accessMask = property(__getAccessMask, __setAccessMask)
[docs]class CanMsg():
"""
This is the general CAN Msg class. Used to store CAN messages.
"""
def __init__(self):
"""
Init method
"""
self.create()
return
def __create(self):
self.__id = None #Frame MSG ID
self.__flags = None #Flags from tranciver
self.__dlc = None #Data length code
self.__res1 = None #Only used by Vector HW
self.__timeStamp = None #Actual timestamp generated by the hardware (for vectore HW Value is in nanoseconds).
self.__data = None #Payload i.e. the transmitted / recived data
def __setId(self, value):
self.__id = value
def __getId(self):
return self.__id
def __setFlags(self, value):
self.__flags = value
def __getFlags(self):
return self.__flags
def __setDlc(self, value):
self.__dlc = value
def __getDlc(self):
return self.__dlc
def __setRes1(self, value):
self.__res1 = value
def __getRes1(self):
return self.__res1
def __setData(self, value):
self.__data = value
def __getData(self):
return self.__data
def __setTimeStamp(self, value):
self.__timeStamp = value
def __getTimeStamp(self):
return self.__timeStamp
id = property(__getId, __setId)
flags = property(__getFlags, __setFlags)
dlc = property(__getDlc, __setDlc)
res1 = property(__getRes1, __setRes1)
data = property(__getData, __setData)
timeStamp = property(__getTimeStamp, __setTimeStamp)
[docs]class CanMsgHolder():
"""
**Description:** The Can Msg Holder class is used to hold CAN frames
from a specific CAN ID. The CAN ID correlates to the MSG ID in the
DBC-file that can be viewed and used together with vectors CANAlyzer.
It has a frame buffer that is the size of SIZE_MSG_BUFFER same as that
the HW buffer is initialized to.
"""
def __init__(self):
"""
Internal method.
"""
self.__bufferSize = SIZE_MSG_BUFFER
self.__msgBuffer = []
self.__latestTimeStamp = None
[docs] def addMsgToBuffer(self, msgIn):
"""
**Description:** Adds a CAN message to the messaged buffer.
:INPUT:
:param msgIn: The CAN msg that will be added to the buffer
:type msgIn: XLevent
**Detailed Description:** The msg buffer is a list and each candrivers
a new msg is added to the msg buffer its added to the end of the
list. If the msg buffer is full the last entry of the msg buffer is
disregarded and the new msg is added in its place.
"""
if len(self.__msgBuffer) == SIZE_MSG_BUFFER:
self.__msgBuffer.pop()
self.__msgBuffer.append(msgIn)
return
[docs] def getMsgFromBuffer(self):
"""
**Not yet implemented**
"""
pass
[docs] def getLatestMsgFromBuffer(self):
"""
**Description:** Retrieves the latest added msg in the msg buffer.
:OUTPUT:
:param msg: The latest msg in the msg buffer
:type msg: XLevent
"""
if len(self.__msgBuffer) != 0:
msg = self.__msgBuffer.pop()
return msg
[docs] def getOldestMsgFromBuffer(self):
"""
**Description:** Retrieves the oldest added msg in the msg buffer.
:OUTPUT:
:param msg: The oldest msg in the msg buffer
:type msg: XLevent
"""
if len(self.__msgBuffer) != 0:
msg = self.__msgBuffer.pop(0)
return msg
[docs] def getNumberOfMsgInBuffer(self):
"""
**Description:** Retrieves the number of msg contained in the msg
buffer.
:OUTPUT:
:param lengthMsgBuffer: The number of msg currently in the msg buffer
:type lengthMsgBuffer: Integer
"""
lengthMsgBuffer = len(self.__msgBuffer)
return lengthMsgBuffer
[docs] def clearBuffer(self):
"""
**Description:** Clears the msg buffer of all messages
"""
self.__msgBuffer = []
def __printMsgBuffer(self):
"""
Internal method.
Used to print out what is currently contained in the msg buffer
onto the standard output
"""
msgNumber = 0
timeStamp = None
print "---------------------------------------------------------------"
for msg in self.__msgBuffer:
tmpResponse = []
timeStamp = msg.timeStamp
for i in range(8):
tmpResponse.append(msg.tagData.msg.data[i])
print "[ MSG NR: %d ]"%msgNumber
print "[ TIMESTAMP: %d ]"%timeStamp
print "- MSG: \t%s\n" %self.__li2ls(tmpResponse)
msgNumber += 1
print "---------------------------------------------------------------"
def __li2ls(self, liIn, iBaseIn = 16, iMinNrOfChars = 2 ):
"""
Internal method.
takes a list of integers and returns a list of strings.
"""
if( iBaseIn == 16 ):
sConv = "%0*X"
else:
sConv = "%0*d"
ls = []
for iData in liIn:
ls.append( sConv % (iMinNrOfChars,iData) )
return ls
[docs]class CanHw():
"""
**Description:** This is the CANHW class that handles all the CAN
communication for the library BeeJay. The class can be used stand alone
to send an receive CAN messages on CAN.
It uses the Vector GmbH XL Driver Library to interface with all
of Vectors supported hardware.
"""
ERROR_MSG = \
{
0 : '[- General error -]',
1 : '[- Not yet implemented -]',
2 : '[- Not supported yet -]',
3 : '[- No hardware selected -]',
4 : '[- No hardware present -]',
5 : '[- No availble channels to select.-]',
6 : '[- Not yet supported please choose none interactive mode i.e. interactive = False -]',
7 : '[- All channels occupied or no hardware present. Please make atleast one channel available -]',
8 : '[- -]'
}
def __init__(self):
"""
Internal method.
"""
self.create()
return
"""
**The methods below are common methods for all HW
"""
[docs] def create(self):
"""
**Description:** The methods is called automatically when a instance
of the class is created by being called from the __init__ method. It
creates and initilizes all the global variables for the class.
"""
#NEW
self.__channelList = [] # A list containing all available channes from each connected HW
self.__selectedChannel = None # The selcted HW channel to transmitt and recvive messages from
self.__msgHolderList = {} # Container of all recived CAN mesages
self.__commonResourceThreadLock = threading.Lock() # Thred lock for common resources
self.__threadFrequency = 0.005 # Recive thred frequency
#OLD
self.__xlDriverConfig = None
self.__hwTypeList = None
self.__hwIndexList = None
self.__canChannel = None
self.__portHandle = None
self.__permissionMask = None
self.__txListenerThread = None
self.__msgBufferListTx = None
self.__msgBufferListRx = None
self.__canLoggin = None
self.__canLoggFile = None
self.__isOnBus = False
return
[docs] def setup(self, fileIn = None, canLoggingOnOff = False, hardwareType = None, interactive = False):
"""
**Description:** The setup method is used to setup the CAN Hardware
with a available channel to send and receive from.
:INPUT:
:param fileIn: The file location of the vxlapi.dll file for the vectors XL Library.
:type fileIn: string
:param canLoggingOnOff: Turn on or off logging of the CAN traffic.
:type canLoggingOnOff: Bool
**Detailed Description:** N/A
"""
if(hardwareType == 'Vector'):
self.__setupVectorHw(fileIn, interactive)
elif(hardwareType == 'Kvaser'):
print self.ERROR_MSG[2]
elif(hardwareType == 'Peak'):
print self.ERROR_MSG[2]
else:
print self.ERROR_MSG[3]
return
#TODO fix independent logging from diffrent CAN HW soruces
if canLoggingOnOff:
self.__canLoggin = canLoggingOnOff
currentDate = time.strftime("%Y-%m-%d")
currentTime = time.strftime("%H-%M-%S")
canLoggFileName = "canLogg_"+"Date_"+currentDate+"_Time_"+currentTime+".txt"
self.__canLoggFile = open(canLoggFileName, "w")
loggFileHeader = "[ - TIMESTAMP - ]\t [ - ID - ]\t\t\t [ - DATA - ]\n"
self.__canLoggFile.write(loggFileHeader)
return
[docs] def manualSetup(self, fileIn = None):
"""
**Description:** Setup the CAN hardware manually
**Not yet implemented**
"""
pass
[docs] def request(self):
"""
**Description:** The request method opens a selected channel and
activates it.
**Detailed Description:** The request class needs to be called after
the setup method and takes the requested channel opens and
activates it for sending and receiving messages onto CAN. It will
exit if no channel is availeble.
Once the channel is open and activated the listener thread is started
and it in it's turn starts to collect CAN messages from CAN. The
thread is executed periodically according to the "threadFrequency"
that is setup in the create method.
"""
if self.__canChannel == None:
print "No CAN Channel chosen to open \n"
return
else:
if not self.__openChannel():
print "Port could not be opened already initialized \n"
return
if not self.__activateChannel():
print "Could not activate channel \n"
return
text = "Opened and activated [ %s ] \n" %self.__canChannel.name
print text
time.sleep(0.005) #Allows the MSG Queue to fill up with messages
self.__rxListenerThreadStart()
[docs] def sendOneCanMsg(self, msgId, msgIn):
"""
**Description:** Sends a CAN msg out on the CAN network.
:INPUT:
:param msgId: The MSG ID of the msg to send out onto CAN.
:type msgId: Hex
:param msgIn: The MSG to send out on to the CAN network.
:type msgIn: List of Hex
**Detailed Description:** The CAN msg need to be formated as below
illustrated for this method to work. First byte is always reserved for
the number of bytes in the CAN msg. The method returns true of
successful otherwise returns false.
.. code:: python
msgListIn = [0x02, 0xF1, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00]
:OUTPUT:
:param return: Returns true if successful otherwise returns false
:type return: Bool
"""
if self.__isOnBus == None:
print "No CAN Channel open for transmit aborting request \n"
return False
self.__msgBufferListTx = XLeventList()
messageCount = ctypes.c_uint(1)
portHandle = self.__portHandle
accessMask = self.__permissionMask
self.__msgBufferListTx[0].tag = XL_TRANSMIT_MSG
self.__msgBufferListTx[0].tagData.msg.id = msgId
self.__msgBufferListTx[0].tagData.msg.dlc = DLC
for byte in range(len(msgIn)):
self.__msgBufferListTx[0].tagData.msg.data[byte] = msgIn[byte]
self.oXLDRIVER.xlCanTransmit(portHandle, accessMask, messageCount \
,self.__msgBufferListTx)
self.__msgBufferListTx = None
return True
[docs] def sendMultipleCanMsg(self):
"""
**Description:** Sending multiple CAN MSG
**Not yet implemented**
"""
pass
[docs] def reciveCanMsg(self):
"""
**Description:** This method is used to grab all messaged that are in
the hardware messaged buffer in the CAN hardware.
The following code takes the messages grabbed from the HW messaged
buffer and adds it to the appropriate msg holder object's msg buffer.
If no msg holder object exists for that specific msg ID one is created
and the messaged is added to it's msg buffer. This ensourse that each
msg sent on CAN has its own msg holder object and its own msg buffer the
size of MAX_MSG_BUFFER_SIZE.
.. code:: python
for msg in self.__msgBufferListRx:
try:
oFrameHolder = self.__msgHolderList[msg.tagData.msg.id]
except:
oFrameHolder = CanMsgHolder()
self.__msgHolderList[msg.tagData.msg.id] = oFrameHolder
oFrameHolder.addMsgToBuffer(msg)
This method is also called by the thread *"rxListenerThread"*
periodically according to the *"threadFrequency"* defined in the create
method.
"""
if self.__isOnBus == False:
print "No CAN Channel open for transmit aborting request \n"
return False
self.__msgBufferListRx = XLeventList()
portHandle = self.__portHandle
pEventCount = ctypes.c_uint(SIZE_MSG_BUFFER)
xlStatus = self.oXLDRIVER.xlReceive(portHandle, pEventCount, self.__msgBufferListRx)
self.__commonResourceThreadLock.acquire()
if xlStatus != XL_ERR_QUEUE_IS_EMPTY:
for msg in self.__msgBufferListRx:
try:
oFrameHolder = self.__msgHolderList[msg.tagData.msg.id]
except:
oFrameHolder = CanMsgHolder()
self.__msgHolderList[msg.tagData.msg.id] = oFrameHolder
oFrameHolder.addMsgToBuffer(msg)
if self.__canLoggin:
if msg.tagData.msg.id != 0x00:
self.__canlogger(msg)
else:
return
self.__commonResourceThreadLock.release()
return
[docs] def readCanMsg(self, msgId = None, outPutInfo = False):
"""
**Description:** This method is used to read CAN MSG's form a specific
MSG ID's msg-buffer.
:INPUT:
:param msgId: The specific MSG ID to fetch CAN MSG from.
:type msgId: Hex
:param outPutInfo: Displays information text to the standards output if
put to true.
:type outPutInfo: Bool
**Detailed Description:** The msg retreived from the msg buffer of the
specific CAN MSG ID is always the first stored CAN MSG from the CAN
Network. This is because the msg que in the CanMsgHolder object is
designed as a FIFO-Que see image bellow. Once the msg is retrieved from
the buffer it is removed from the que and can't be retrieved again.
Running the method again will retrieve the next message in the que and
so forth until the que is empty.
.. figure:: images/CanMsgBuffer.png
:align: center
:OUTPUT:
:param tmpMsg: Retrieved message from the msg buffer.
:type tmpMsg: XLevent
"""
self.__commonResourceThreadLock.acquire()
if msgId == None:
print "No MSG ID given"
self.__commonResourceThreadLock.release()
return
elif (msgId not in self.__msgHolderList.keys()) and not outPutInfo:
print "No MSG from ID 0x%0.2X received...\n"%msgId
self.__commonResourceThreadLock.release()
return
elif msgId in self.__msgHolderList.keys():
tmpMsgHolder = self.__msgHolderList[msgId]
tmpMsg = tmpMsgHolder.getOldestMsgFromBuffer()
self.__commonResourceThreadLock.release()
return tmpMsg
self.__commonResourceThreadLock.release()
return
[docs] def release(self):
"""
**Description:** This method is used to release the requested
resources.
"""
if self.__portHandle == None:
return
self.__rxListenerThreadStop()
self.__deactivateChannel()
self.__closeChannel()
[docs] def destroy(self):
"""
**Description:** This method is used to clean up all.
"""
self.__xlDriverConfig = None
self.__hwTypeList = None
self.__hwIndexList = None
self.__canChannel = None
self.__portHandle = None
self.__permissionMask = None
[docs] def printHardware(self, channelsIn = None):
"""
**Description:** This method prints the connected hardware.
:INPUT:
:param channelIn: Number of channels to print starts at zero i.e. the
first channel.
:type channelIn: Int
**Detailed Description:** If no channel is given the method the driver
scans the ports for connected hardware and prints all avalible hardware
onto the standard output.
"""
self.oXLDRIVER.xlPrintDriverConfig(channelsIn)
return
def __setupVectorHw(self, fileIn, interactive):
self.__oXLDRIVER = XlDriver()
if fileIn == None:
print "[- No path to the VectorXL driver was given -]"
if platform.machine() == 'i386':
print "[- Using default path: C:\\Users\\Public\\Documents\\Vector XL Driver Library\\bin\\vxlapi.dll -]\n"
fileIn = 'C:\\Users\\Public\\Documents\\Vector XL Driver Library\\bin\\vxlapi.dll'
elif platform.machine() == 'AMD64':
print "[- Using default path: C:\\Users\\Public\\Documents\\Vector XL Driver Library\\bin\\vxlapi64.dll -]\n"
fileIn = 'C:\\Users\\Public\\Documents\\Vector XL Driver Library\\bin\\vxlapi64.dll'
else:
print "[- Platform not recognisable -]"
print "[- No library loaded! -]"
return False
#Load library for access to the Vextor XL Driver
if not self.__oXLDRIVER.xlLoadLibrary(fileIn):
return False
#Open driver to access the internal functions of the XL driver
if not self.__oXLDRIVER.xlOpenDriver():
return False
#If interactive = True then the user will be promed via standard I/O to enter input to the setup
#If interactive = False then the software will automatically find the first avilable channel and open it
if interactive == True:
print self.ERROR_MSG[6]
else:
#Check if the software can access the driver configuration
if not self.__oXLDRIVER.xlGetDriverConfig():
return False
else:
#Get the complete driver configuration (all channels)
xlDriverConfig = self.__oXLDRIVER.xlGetDriverConfig()
#Extract each channels driver configuration and add it to a new channel object in the channelList
for channel in range(xlDriverConfig.channelCount):
tmpCanHwChannel = CanHwChannel()
tmpCanHwChannel.hardware = 'Vector'
tmpCanHwChannel.dllVersion = xlDriverConfig.dllVersion
tmpCanHwChannel.numberOfChannels = xlDriverConfig.channelCount
tmpCanHwChannel.channelName = xlDriverConfig.Channel[channel].name
tmpCanHwChannel.hardwareType = self.__oXLDRIVER.HARDWARE_TYPE_MSG[xlDriverConfig.Channel[channel].hwType]
tmpCanHwChannel.hardwareIndex = xlDriverConfig.Channel[channel].hwIndex
tmpCanHwChannel.hardwareChannel = xlDriverConfig.Channel[channel].hwChannel
tmpCanHwChannel.transceiverType = self.__oXLDRIVER.TRANSCEIVER_TYPE_MSG[xlDriverConfig.Channel[channel].transceiverType]
tmpCanHwChannel.transceiverState = xlDriverConfig.Channel[channel].transceiverState
tmpCanHwChannel.channelIndex = xlDriverConfig.Channel[channel].channelIndex
tmpCanHwChannel.channelMask = xlDriverConfig.Channel[channel].channelMask
tmpCanHwChannel.isOnBus = self.__oXLDRIVER.IS_ON_BUS_MSG[xlDriverConfig.Channel[channel].isOnBus]
tmpCanHwChannel.connectedBusType = self.__oXLDRIVER.BUS_TYPE_MSG[xlDriverConfig.Channel[channel].connectedBusType]
tmpCanHwChannel.hardwareSerialNumber = xlDriverConfig.Channel[channel].serialNumber
tmpCanHwChannel.hardwareArticleNumber = xlDriverConfig.Channel[channel].articleNumber
tmpCanHwChannel.transceiverName = xlDriverConfig.Channel[channel].transceiverName
self.__channelList.append(tmpCanHwChannel) #Add the channel to the list
#Find the first available channel that is not occupied or open.
if re.search('CAN', tmpCanHwChannel.transceiverType) and (tmpCanHwChannel.isOnBus == 'NO'):
self.__selectedChannel = tmpCanHwChannel
if self.__selectedChannel == None:
print self.ERROR_MSG[5]
print self.ERROR_MSG[7]
return False
#Open CAN selected channel
[self.__selectedChannel.portHandel, self.__selectedChannel.accessMask] = self.__oXLDRIVER.xlOpenPort(portHandle, userName, accessMask, permissionMask, rxQueueSize, xlInterfaceVersion, busType)
def __openChannel(self):
"""
Internal method.
Opens a CAN Channel for read/write access
"""
portHandle = XLportHandle(XL_INVALID_PORTHANDLE)
userName = ctypes.c_char_p('xlCANControlApp')
accessMask = XLaccess(self.__canChannel.channelMask)
permissionMask = XLaccess(self.__canChannel.channelMask)
rxQueueSize = ctypes.c_uint(SIZE_MSG_BUFFER)
xlInterfaceVersion = ctypes.c_uint(XL_INTERFACE_VERSION)
busType = ctypes.c_uint(XlDriver.BUS_TYPE['XL_BUS_TYPE_CAN'])
[portHandle, permissionMask] = self.oXLDRIVER.xlOpenPort(portHandle, \
userName, accessMask, permissionMask, \
rxQueueSize, xlInterfaceVersion, busType)
if permissionMask.value == XL_NO_INIT_ACCESS:
return False
else:
self.__portHandle = portHandle
self.__permissionMask = permissionMask
return True
def __activateChannel(self):
"""
Internal method.
Activates a designated CAN channel
"""
portHandle = self.__portHandle
accessMask = self.__permissionMask
busType = ctypes.c_uint(XlDriver.BUS_TYPE['XL_BUS_TYPE_CAN'])
flags = ctypes.c_uint(XL_ACTIVATE_RESET_CLOCK)
if not self.oXLDRIVER.xlActivateChannel(portHandle, accessMask, \
busType, flags):
return False
else:
return True
def __canlogger(self, msg):
"""
Internal method.
This method loggs all the traffic on CAN during the measurements
"""
timeStamp = msg.timeStamp
msgId = "%02X "%msg.tagData.msg.id
byte1 = "%02X "%msg.tagData.msg.data[0]
byte2 = "%02X "%msg.tagData.msg.data[1]
byte3 = "%02X "%msg.tagData.msg.data[2]
byte4 = "%02X "%msg.tagData.msg.data[3]
byte5 = "%02X "%msg.tagData.msg.data[4]
byte6 = "%02X "%msg.tagData.msg.data[5]
byte7 = "%02X "%msg.tagData.msg.data[6]
byte8 = "%02X "%msg.tagData.msg.data[7]
loggStr = "\t" + str(timeStamp) + "\t\t\t" + msgId + "\t\t" + byte1 + byte2 + byte3 + byte4 + byte5 + byte6 + byte7 + byte8 + "\n"
self.__canLoggFile.write(loggStr)
def __printCanMsgList(self, canMsgListIn = None):
"""
Internal method.
This internal method is for debug purpose only. It takes a list of
can messaged of type XLeventList and prints it to the standard
output. if no input variable is given to the method it prints all the
messages that are currently in the msgBufferListRx variable.
"""
if canMsgListIn == None:
i = 0
for x in range(SIZE_MSG_BUFFER/4):
msg1 = self.__msgBufferListRx[i]
msg2 = self.__msgBufferListRx[i+1]
msg3 = self.__msgBufferListRx[i+2]
msg4 = self.__msgBufferListRx[i+3]
print "\n--------------------------------------------------------------------------------------------"
print "[ %d ]" %x
print "Buffer Slot Nr: ", i, "\t Buffer Slot Nr: ", i+1, "\t Buffer Slot Nr: ", i+2, "\t Buffer Slot Nr: ", i+3
print "Event Tag: ", msg1.tag, "\t\t Event Tag: ", msg2.tag, "\t\t Event Tag: ", msg3.tag, "\t\t Event Tag: ", msg4.tag
print "Time Stamp: ", msg1.timeStamp, "\t Time Stamp: ", msg2.timeStamp, "\t Time Stamp: ", msg3.timeStamp, "\t Time Stamp: ", msg4.timeStamp
print "Msg Id: 0x%0.2X" %(msg1.tagData.msg.id), "\t\t Msg Id: 0x%0.2X" %(msg2.tagData.msg.id), "\t\t Msg Id: 0x%0.2X" %(msg3.tagData.msg.id), "\t\t Msg Id: 0x%0.2X" %(msg4.tagData.msg.id)
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[0]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[0]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[0]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[0])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[1]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[1]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[1]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[1])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[2]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[2]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[2]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[2])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[3]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[3]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[3]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[3])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[4]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[4]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[4]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[4])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[5]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[5]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[5]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[5])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[6]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[6]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[6]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[6])
print "Data: 0x%0.2X" %(msg1.tagData.msg.data[6]), "\t\t Data: 0x%0.2X" %(msg2.tagData.msg.data[7]), "\t\t Data: 0x%0.2X" %(msg3.tagData.msg.data[7]), "\t\t Data: 0x%0.2X" %(msg4.tagData.msg.data[7])
i = i + 4
def __rxListenerThreadStart(self):
self.__rxListenerThread = RxListenerThread(self.reciveCanMsg, \
"rxListenerThread", \
self.__threadFrequency)
self.__rxListenerThread.start()
def __rxListenerThreadStop(self):
self.__rxListenerThread.stop()
def __deactivateChannel(self):
'''
Deactivates the CAN channel
'''
if (self.__permissionMask == None) or (self.__portHandle == None):
return
portHandle = self.__portHandle
accessMask = self.__permissionMask
self.oXLDRIVER.xlDeactivateChannel(portHandle, accessMask)
def __closeChannel(self):
'''
Closes the opened CAN channel
'''
portHandle = self.__portHandle
self.oXLDRIVER.xlClosePort(portHandle)
[docs]class RxListenerThread(threading.Thread):
"""
**Description:** This class handles the thread that is assigned to listen
to the CAN network and read all the messages transmitted on the connected
CAN network.
"""
def __init__(self, methodToRunIn, threadNameIn, runFrequencyIn = None):
"""
internal method
"""
threading.Thread.__init__(self)
self.methodToRun = methodToRunIn
self.threadName = threadNameIn
self.runFrequency = runFrequencyIn
self.stopThread = False
if runFrequencyIn == None:
self.runFrequency = 0.005
[docs] def run(self):
"""
**Description:** starts the CAN listener thread with the desired
thread frequency.
"""
text = "Starting CAN listener thread with FRQ %.3f [sec]\n" %self.runFrequency
print text
while not(self.stopThread):
if self.stopThread == True:
break
self.methodToRun()
time.sleep(self.runFrequency)
text = "Stopping CAN listener thread\n"
print text
return
[docs] def stop(self):
"""
**Description:** This method stops the listener thread.
"""
self.stopThread = True
time.sleep(1)
return
if __name__ == '__main__':
"""
IMPORTAN READ THIS!
*******************
This is used to debug the module or to run the module
by it self. Pleas comment away when not in use.
"""
oCANHW = CanHw()
fileIn = 'C:\\Users\\Public\\Documents\\Vector XL Driver Library\\bin\\vxlapi64.dll'
oCANHW.setup(fileIn = None, canLoggingOnOff = False, hardwareType = 'Vector', interactive = False)
# oCANHW.request()
# oCANHW.printHardware()
# msgBufferList = []
#
# msg = [0x03,0x22,0xF1,0x86]
# nodeAdress = 0x738
#
# oCANHW.sendOneCanMsg(nodeAdress, msg)
#
#
# while(1):
# msg = oCANHW.readCanMsg(0x10)
#
# print "\n[ TIMESTAMP: %d ]"%msg.timeStamp
# print "[ BYTE 1:\t 0x%0.2X ]"%msg.tagData.msg.data[0]
# time.sleep(0.05)
# print "[ BYTE 2:\t 0x%0.2X ]"%msg.tagData.msg.data[1]
# print "[ BYTE 3:\t 0x%0.2X ]"%msg.tagData.msg.data[2]
# print "[ BYTE 4:\t 0x%0.2X ]"%msg.tagData.msg.data[3]
# print "[ BYTE 5:\t 0x%0.2X ]"%msg.tagData.msg.data[4]
# print "[ BYTE 6:\t 0x%0.2X ]"%msg.tagData.msg.data[5]
# print "[ BYTE 7:\t 0x%0.2X ]"%msg.tagData.msg.data[6]
# print "[ BYTE 8:\t 0x%0.2X ]\n"%msg.tagData.msg.data[7]
oCANHW.release()