Source code for psychopy_cedrus.cedrus

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2022 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

"""Cedrus make a variety of input devices.
See http://www.cedrus.com/

DEPRECATED:
This sub-package is out of date. Please use the cedrus-written
pyxid2 package instead (bundled with Standalone PsychoPy)::
    import pyxid2

----------
"""

from psychopy import core
import struct

try:
    import serial
except ImportError:
    serial = False


class _KeyEvent:
    """Info about a keypress from Cedrus keypad XID string
    """

    def __init__(self, XID):
        """XID should contain a "k"<info><rt> where info is a byte
        and rt is 4 bytes (=int)
        """
        super(_KeyEvent, self).__init__()
        if len(XID) != 6:
            # log.error("The XID string %s is %i bytes long and should
            #   be 6 bytes" %(str([XID]),len(XID)))
            self.key = None
        else:
            # a character and a ubyte of info
            info = struct.unpack('B', XID[1])[0]

            # was the key going down or up?
            if (info >> 4) % 2:  # this gives only the 4th bit
                self.direction = 'down'
            else:
                self.direction = 'up'

            # what was the key?
            self.key = info >> 5  # bits 5-7 give the button number

            # what was RT?
            self.rt = struct.unpack('i', XID[2:])[0]  # integer in ms


[docs] class RB730: """Class to control/read a Cedrus RB-series response box """ def __init__(self, port, baudrate=115200, mode='XID'): super(RB730, self).__init__() if not serial: raise ImportError("The module serial is needed to connect to the" " Cedrus response pad. On most systems this can" " be installed with\n\t easy_install pyserial") self.model = 'RB703' # set name of port if type(port) in [int, float]: self.portNumber = port self.portString = 'COM%i' % self.portNumber else: self.portString = port self.portNumber = None self.mode = mode # can be 'xid', 'rb', 'ascii' self.baudrate = baudrate # open the serial port self.port = serial.Serial(self.portString, baudrate=baudrate, bytesize=8, parity='N', stopbits=1, timeout=0.0001) if not self.port.isOpen(): self.port.open() # self.buffer = '' # our own buffer (in addition to the serial port # buffer) self.clearBuffer()
[docs] def sendMessage(self, message): self.port.writelines(message)
def _clearBuffer(self): """DEPRECATED as of 1.00.05 """ self.port.flushInput()
[docs] def clearBuffer(self): """Empty the input buffer of all characters. Call this to clear any keypresses that haven't yet been handled. """ self.port.flushInput()
[docs] def getKeyEvents(self, allowedKeys=(1, 2, 3, 4, 5, 6, 7), downOnly=True): """Return a list of keyEvents Each event has the following attributes: keyEvt.key is the button pressed (or released) (an int) keyEvt.rt [=float] is the time (in secs) since the rt clock was last reset (a float) keyEvt.direction is the direction the button was going ('up' or 'down') allowedKeys will limit the set of keys that are returned (WARNING: info about other keys is discarded) downOnly limits the function to report only the downward stroke of the key """ # get the raw string nToGet = self.port.inWaiting() # self.buffer += self.port.read(nToGet) # extend our own buffer (then # remove the bits we use) # extend our own buffer (then remove the bits we use) inputStr = self.port.read(nToGet) keys = [] # loop through messages for keys nKeys = inputStr.count('k') # find the "k"s for keyN in range(nKeys): start = inputStr.find('k') # find the next key stop = start + 6 # check we have that many characters(in case we read the buffer # partway through output) if len(inputStr) < stop: inputStr += self.port.read(stop - len(inputStr)) keyString = inputStr[start:stop] keyEvt = _KeyEvent(XID=keyString) if keyEvt.key not in allowedKeys: continue # ignore this keyEvt and move on if (downOnly == True and keyEvt.direction == 'up'): continue # ignore this keyEvt and move on # we found a valid keyEvt keys.append(keyEvt) # remove the (1st occurrence of) string from the buffer inputStr = inputStr.replace(keyString, '', 1) return keys
[docs] def readMessage(self): """Read and return an unformatted string from the device (and delete this from the buffer) """ nToGet = self.port.inWaiting() return self.port.read(nToGet)
[docs] def measureRoundTrip(self): # round trip self.sendMessage(b'e4') # start round trip # wait for 'X' while True: if self.readMessage() == 'X': break self.sendMessage(b'X') # send it back # wait for final time info msgBack = '' while len(msgBack) == 0: msgBack = self.readMessage() tStr = msgBack[2:] t = struct.unpack('H', tStr)[0] # 2 bytes (an unsigned short) return t
[docs] def waitKeyEvents(self, allowedKeys=(1, 2, 3, 4, 5, 6, 7), downOnly=True): """Like getKeyEvents, but waits until a key is pressed """ noKeyYet = True while noKeyYet: keys = self.getKeyEvents( allowedKeys=allowedKeys, downOnly=downOnly) if len(keys) > 0: noKeyYet = False return keys
[docs] def resetTrialTimer(self): self.sendMessage(b'e5')
[docs] def resetBaseTimer(self): self.sendMessage(b'e1')
[docs] def getBaseTimer(self): """Retrieve the current time on the base timer """ self.sendMessage(b'e3') # core.wait(0.05) localTimer = core.Clock() msg = self.readMessage() ii = msg.find('e3') tStr = msg[ii + 2:ii + 6] # 4 bytes (an int) of time info t = struct.unpack('I', tStr)[0] return t
[docs] def getInfo(self): """Get the name of this device """ self.sendMessage(b'_d1') core.wait(0.1) return self.readMessage()
if __name__ == "__main__": pass

Back to top