#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Originally part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2022 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).
# Acknowledgements:
# This code was initially written by Jon Peirce.
# with substantial additions by Andrew Schofield
# CRS Ltd provided support as needed.
# Shader code for mono++ and color++ modes was based on code in Psychtoolbox
# (Kleiner) but does not actually use that code directly
import os
import sys
import time
import glob
import weakref
import serial
import numpy as np
from copy import copy, deepcopy
from time import sleep
import time
# Python 3.8 removed time.clock()
if sys.version_info < (3, 8):
clock = time.clock
else:
clock = time.perf_counter
from . import shaders
from psychopy import logging, core
from psychopy.hardware import serialdevice
import threading
try:
import Queue
except Exception:
import queue as Queue
__docformat__ = "restructuredtext en"
DEBUG = True
plotResults = False
if plotResults:
from matplotlib import pyplot
try:
from psychopy.ext import _bits
haveBitsDLL = True
except Exception:
haveBitsDLL = False
if DEBUG: # we don't want error skipping in debug mode!
from . import shaders
haveShaders = True
else:
try:
from . import shaders
haveShaders = True
except Exception:
haveShaders = False
try:
import configparser
except Exception:
import ConfigParser as configparser
# Bits++ modes
bits8BITPALETTEMODE = 0x00000001 # /* normal vsg mode */
NOGAMMACORRECT = 0x00004000 # /* Gamma correction mode */
GAMMACORRECT = 0x00008000 # /* Gamma correction mode */
VIDEOENCODEDCOMMS = 0x00080000 # must set so that LUT is read from screen
[docs]
class status(dict):
"""clever dict like object or object like dict
for Bits# status messages
"""
def __init__(self, sample=0,
t=0,
trigIn=0,
bitsvals=0,
IR=0,
ADCs=0.0):
self['sample']=sample
self['time']=t
self['trigIn']=trigIn
self['DIN']=[bitsvals]*10
self['DWORD']=bitsvals
self['IR']=[IR]*6
self['ADC']=[ADCs]*6
def __getattr__(self, key):
try:
return self[key]
except KeyError as k:
return None
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
try:
del self[key]
except KeyError as k:
raise AttributeError(k)
def __repr__(self):
return '<sample ' + dict.__repr__(self) + '>'
def __getstate__(self):
return dict(self)
def __setstate__(self,value):
for k,v in value.items(): self[k]=v
[docs]
class event(dict):
"""clever dict like object or object like dict
for Bits# status events
"""
def __init__(self, source='None',
t=0,
input=0,
direction='None'):
self['dir']=direction
self['source']=source
self['time']=t
self['input']=input
def __getattr__(self, key):
try:
return self[key]
except KeyError as k:
return None
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
try:
del self[key]
except KeyError as k:
raise AttributeError(k)
def __repr__(self):
return '<event ' + dict.__repr__(self) + '>'
def __getstate__(self):
return dict(self)
def __setstate__(self,value):
for k,v in value.items(): self[k]=v
[docs]
class touch(dict):
"""clever dict like object or object like dict
for touch screen presses
"""
def __init__(self, t=0, x=0, y=0, direction='down'):
self['time']=t
self['x']=x
self['y']=y
self['dir']=direction
def __getattr__(self, key):
try:
return self[key]
except KeyError as k:
return None
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
try:
del self[key]
except KeyError as k:
raise AttributeError(k)
def __repr__(self):
return '<touch ' + dict.__repr__(self) + '>'
def __getstate__(self):
return dict(self)
def __setstate__(self,value):
for k,v in value.items(): self[k]=v
[docs]
class BitsPlusPlus:
"""The main class to control a Bits++ box.
This is usually a class added within the window object and is
typically accessed from there. e.g.::
from psychopy import visual
from psychopy.hardware import crs
win = visual.Window([800,600])
bits = crs.BitsPlusPlus(win, mode='bits++')
# use bits++ to reduce the whole screen contrast by 50%:
bits.setContrast(0.5)
"""
def __init__(self,
win,
contrast=1.0,
gamma=None,
nEntries=256,
mode='bits++',
rampType='configFile',
frameRate=None):
"""
Parameters
-----------
contrast :
The contrast to be applied to the LUT.
See :func:`BitsPlusPlus.setLUT` and
:func:`BitsPlusPlus.setContrast` for flexibility on setting
just a section of the LUT to a different value
gamma :
The value used to correct the gamma in the LUT
nEntries : 256
[DEPRECATED feature]
mode : 'bits++' (or 'mono++' or 'color++')
Note that, unlike the Bits#, this only affects the way the
window is rendered, it does not switch the state of the Bits++
device itself (because unlike the Bits# have no way to
communicate with it).
The mono++ and color++ are only supported in PsychoPy 1.82.00
onwards. Even then they suffer from not having gamma
correction applied on Bits++ (unlike Bits# which can apply
a gamma table in the device hardware).
rampType : 'configFile', None or an integer
if 'configFile' then we'll look for a valid config in the
userPrefs folder if an integer then this will be used during
win.setGamma(rampType=rampType):
frameRate : an estimate the frameRate of the monitor. If None frame rate
will be calculated.
"""
self.win = win
self.contrast = contrast
self.nEntries = nEntries
self.mode = mode
# Frame rate isused to calculate trigger packets but for some reason
# Bits++ needs it to be faked
if frameRate == None:
frameRate = self.win.getActualFrameRate()
self.frameRate=frameRate*0.9
# used to allow setting via USB which was 'slow':
self.method = 'fast'
# Bits++ doesn't do its own correction so we need to:
self.gammaCorrect = 'software'
# import pyglet.GL late so that we can import bits.py without it
# initially
global GL, visual
from psychopy import visual
import pyglet.gl as GL
if self.gammaCorrect == 'software':
if gamma is None:
# inherit from window:
self.gamma = win.gamma
elif len(gamma) > 2:
# [Lum,R,G,B] or [R,G,B]
self.gamma = gamma[-3:]
else:
self.gamma = [gamma, gamma, gamma]
if init():
setVideoMode(NOGAMMACORRECT | VIDEOENCODEDCOMMS)
self.initialised = True
logging.debug('Found and initialised Bits++')
else:
self.initialised = False
logging.warning("Couldn't initialise Bits++")
# do the processing
self._setHeaders(self.frameRate)
self.setLUT() # this will set self.LUT and update self._LUTandHEAD
self._setupShaders()
# replace window methods with our custom ones
self.win._prepareFBOrender = self._prepareFBOrender
self.win._finishFBOrender = self._finishFBOrender
self.win._afterFBOrender = self._afterFBOrender
# set gamma of the window to the identity LUT
if rampType == 'configFile':
# now check that we have a valid configuration of the box
self.config = Config(self)
# check we matche the prev config for our graphics card etc
ok = False # until we find otherwise
ok = self.config.quickCheck()
if ok:
self.win.gammaRamp = self.config.identityLUT
else:
rampType = None
if not rampType == 'configFile':
# 'this must NOT be an `else` from the above `if` because can be
# overridden possibly we were given a numerical rampType (as in
# the :func:`psychopy.gamma.setGamma()`)
self.win.winHandle.setGamma(self.win.winHandle._dc, rampType=rampType)
#==================================#
# Helper function for __init__ #
#==================================#
def _setHeaders(self, frameRate):
""" Sets up the TLock header codes and some flags that are
common to operating all CRS devices
"""
# TLock for setting LUTs in the CRS device
self._HEADandLUT = np.zeros((524, 1, 3), np.uint8)
# R
valsR = (36, 63, 8, 211, 3, 112, 56, 34, 0, 0, 0, 0)
self._HEADandLUT[:12, :, 0] = np.asarray(valsR).reshape([12, 1])
# G
valsG = (106, 136, 19, 25, 115, 68, 41, 159, 0, 0, 0, 0)
self._HEADandLUT[:12, :, 1] = np.asarray(valsG).reshape([12, 1])
# B
valsB = (133, 163, 138, 46, 164, 9, 49, 208, 0, 0, 0, 0)
self._HEADandLUT[:12, :, 2] = np.asarray(valsB).reshape([12, 1])
self.LUT = np.zeros((256, 3), 'd') # just a place holder
#TLock Header for everything else
self._NumberPackets = int(round(1+10000/frameRate,0))
# R:
TLockR = (69, 40, 19, 119, 52, 233, 41, 183,
0, 1, 0, 2, 0, 3, 0, 6, 0, 7, 0)
# G:
TLockG = (33, 230, 190, 84, 12, 108,201, 124,
0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 0)
# B:
TLockB = (56, 208, 102, 207, 192, 172,80, 221,
self._NumberPackets-1, 0, 0, 0, 0, 0, 0, 2, 0, 255, 0)
# The following is used to reset the Bits# clock
self._HEADandClock = np.zeros(((self._NumberPackets*2)+19,1,3),
np.uint8)
self._HEADandClock[:19,:,0] = np.asarray(TLockR).reshape([19,1])#R
self._HEADandClock[:19,:,1] = np.asarray(TLockG).reshape([19,1])#G
self._HEADandClock[:19,:,2] = np.asarray(TLockB).reshape([19,1])#B
self._HEADandClock[15,:,1] = 12 # Pixel 15 green = 12 to reset clock.
self._HEADandClockstr = self._HEADandClock.tostring()
# The following are used to send triggers and control FE1 goggles
# via the digital output lines.
self._HEADandTrig = np.zeros(((self._NumberPackets*2)+19,1,3),
np.uint8)
self._HEADandGogLeftOpen = np.zeros(((self._NumberPackets*2)+19,1,3),
np.uint8)
self._HEADandGogRightOpen = np.zeros(((self._NumberPackets*2)+19,1,3),
np.uint8)
self._HEADandGogBothOpen = np.zeros(((self._NumberPackets*2)+19,1,3),
np.uint8)
self._HEADandGogBothClosed = np.zeros(((self._NumberPackets*2)+19,1,3)
, np.uint8)
self._HEADandTrig[:19,:,0] = np.asarray(TLockR).reshape([19,1])#R
self._HEADandTrig[:19,:,1] = np.asarray(TLockG).reshape([19,1])#G
self._HEADandTrig[:19,:,2] = np.asarray(TLockB).reshape([19,1])#B
self._HEADandGogLeftOpen[:19,:,0] = (
np.asarray(TLockR).reshape([19,1]))
self._HEADandGogLeftOpen[:19,:,1] = (
np.asarray(TLockG).reshape([19,1]))
self._HEADandGogLeftOpen[:19,:,2] = (
np.asarray(TLockB).reshape([19,1]))
self._HEADandGogRightOpen[:19,:,0] = (
np.asarray(TLockR).reshape([19,1]))
self._HEADandGogRightOpen[:19,:,1] = (
np.asarray(TLockG).reshape([19,1]))
self._HEADandGogRightOpen[:19,:,2] = (
np.asarray(TLockB).reshape([19,1]))
self._HEADandGogBothOpen[:19,:,0] = (
np.asarray(TLockR).reshape([19,1]))
self._HEADandGogBothOpen[:19,:,1] = (
np.asarray(TLockG).reshape([19,1]))
self._HEADandGogBothOpen[:19,:,2] = (
np.asarray(TLockB).reshape([19,1]))
self._HEADandGogBothClosed[:19,:,0] = (
np.asarray(TLockR).reshape([19,1]))
self._HEADandGogBothClosed[:19,:,1] = (
np.asarray(TLockG).reshape([19,1]))
self._HEADandGogBothClosed[:19,:,2] = (
np.asarray(TLockB).reshape([19,1]))
# flags for controlling triggers, goggles and analog outputs
self.trigger=False
self.clockReset=False
self.clockReset=False
self.gogglesGo=False
self.gogglesLeft = 0
self.gogglesRight = 1
# Set up some blank triggers
self.setTrigger()
self.triggerProtected = False
#==========================================#
# Lut Functions #
#==========================================#
[docs]
def setLUT(self, newLUT=None, gammaCorrect=True, LUTrange=1.0):
"""Sets the LUT to a specific range of values in 'bits++' mode only
Note that, if you leave gammaCorrect=True then any LUT values you
supply will automatically be gamma corrected.
The LUT will take effect on the next `Window.flip()`
**Examples**:
- `bitsBox.setLUT()` to build a LUT using bitsBox.contrast and bitsBox.gamma
- `bitsBox.setLUT(newLUT=some256x1array)` (NB array should be float 0.0:1.0)
Builds a luminance LUT using newLUT for each gun
(actually array can be 256x1 or 1x256)
- `bitsBox.setLUT(newLUT=some256x3array)`
(NB array should be float 0.0:1.0)
Allows you to use a different LUT on each gun
(NB by using BitsBox.setContr() and BitsBox.setGamma() users may not
need this function)
"""
# choose endpoints
LUTrange = np.asarray(LUTrange)
if LUTrange.size == 1:
startII = int(round((0.5 - LUTrange/2.0) * 255.0))
# +1 because python ranges exclude last value:
endII = int(round((0.5 + LUTrange/2.0) * 255.0)) + 1
elif LUTrange.size == 2:
multiplier = 1.0
if LUTrange[1] <= 1:
multiplier = 255.0
startII = int(round(LUTrange[0] * multiplier))
# +1 because python ranges exclude last value:
endII = int(round(LUTrange[1] * multiplier)) + 1
stepLength = 2.0/(endII - startII - 1)
if newLUT is None:
# create a LUT from scratch (based on contrast and gamma)
# rampStep = 2.0/(self.nEntries-1)
ramp = np.arange(-1.0, 1.0 + stepLength, stepLength)
ramp = (ramp * self.contrast + 1.0)/2.0
# self.LUT will be stored as 0.0:1.0 (gamma-corrected)
self.LUT[startII:endII, 0] = copy(ramp)
self.LUT[startII:endII, 1] = copy(ramp)
self.LUT[startII:endII, 2] = copy(ramp)
elif type(newLUT) in [float, int] or (newLUT.shape == ()):
self.LUT[startII:endII, 0] = newLUT
self.LUT[startII:endII, 1] = newLUT
self.LUT[startII:endII, 2] = newLUT
elif len(newLUT.shape) == 1:
# one dimensional LUT
# replicate LUT to other channels, check range is 0:1
if newLUT > 1.0:
logging.warning('newLUT should be float in range 0.0:1.0')
self.LUT[startII:endII, 0] = copy(newLUT.flat)
self.LUT[startII:endII, 1] = copy(newLUT.flat)
self.LUT[startII:endII, 2] = copy(newLUT.flat)
elif len(newLUT.shape) == 2:
# one dimensional LUT
# use LUT as is, check range is 0:1
if max(max(newLUT)) > 1.0:
raise AttributeError('newLUT should be float in range 0.0:1.0')
self.LUT[startII:endII, :] = newLUT
else:
logging.warning('newLUT can be None, nx1 or nx3')
# do gamma correction if necessary
if self.gammaCorrect == 'software':
gamma = self.gamma
try:
lin = self.win.monitor.linearizeLums
self.LUT[startII:endII, :] = lin(self.LUT[startII:endII, :],
overrideGamma=gamma)
except AttributeError:
try:
lin = self.win.monitor.lineariseLums
self.LUT[startII:endII, :] = lin(self.LUT[startII:endII, :],
overrideGamma=gamma)
except AttributeError:
pass
# update the bits++ box with new LUT
# get bits into correct order, shape and add to header
# go from ubyte to uint16
ramp16 = (self.LUT * (2**16 - 1)).astype(np.uint16)
ramp16 = np.reshape(ramp16, (256, 1, 3))
# set most significant bits
self._HEADandLUT[12::2, :, :] = (ramp16[:, :, :] >> 8).astype(np.uint8)
# set least significant bits
self._HEADandLUT[13::2, :, :] = (
ramp16[:, :, :] & 255).astype(np.uint8)
self._HEADandLUTstr = self._HEADandLUT.tostring()
[docs]
def setContrast(self, contrast, LUTrange=1.0, gammaCorrect=None):
"""Set the contrast of the LUT for 'bits++' mode only
:Parameters:
contrast : float in the range 0:1
The contrast for the range being set
LUTrange : float or array
If a float is given then this is the fraction of the LUT
to be used. If an array of floats is given, these will
specify the start / stop points as fractions of the LUT.
If an array of ints (0-255) is given these determine the
start stop *indices* of the LUT
Examples:
- `setContrast(1.0,0.5)` to set the central 50% of the LUT so that a stimulus with
contr=0.5 will actually be drawn with contrast 1.0
- `setContrast(1.0,[0.25,0.5])`
- or `setContrast(1.0,[63,127])` to set the lower-middle quarter of the LUT
(which might be useful in LUT animation paradigms)
"""
self.contrast = contrast
if gammaCorrect is None:
if gammaCorrect not in [False, "hardware"]:
gammaCorrect = False
else:
gammaCorrect = True
# setLUT uses contrast automatically
self.setLUT(newLUT=None, gammaCorrect=gammaCorrect, LUTrange=LUTrange)
[docs]
def setGamma(self, newGamma):
"""Set the LUT to have the requested gamma value
Currently also resets the LUT to be a linear contrast
ramp spanning its full range. May change this to read
the current LUT, undo previous gamma and then apply
new one?"""
self.gamma = newGamma
self.setLUT() # easiest way to update
#=================================================#
# Bits Clock Functions #
#=================================================#
[docs]
def resetClock(self):
"""Issues a clock reset code using 1 screen flip
if the next frame(s) is dropped the reset will be
re-issued thus keeping timing good.
Resets continue to be issued on each video frame until
the next win.flip so you need to have regular win.flips for
this function to work properly.
Example:
bits.resetClock()
drawImage()
bits.win.flip()
Will issue clock resets while the image is being drawn then
display the image and allow the clock to continue from the
same frame.
Example:
bits.resetClock()
bits.RTBoxWait()
bits.win.flip()
Will issue clock resets until a button is pressed.
"""
self.clockReset=True
self.win.flip() # Send reset signal this frame, reset will happen next frame.
# If next winflip is late the reset flag should latch thus
# resetting clock until the next winflip
[docs]
def primeClock(self):
"""Primes the clock to reset at the next screen
flip - note only 1 clock reset signal will be issued
but if the frame(s) after the reset frame is
dropped the reset will be re-issued thus keeping timing good.
Resets continute to be issued on each video frame until
the next win.flip so you need to have regular win.flips for
this function to work properly.
Example::
bits.primeClock()
drawImage
while not response
#do some processing
bits.win.flip()
Will get a clock reset signal ready but won't issue it until the first win.flip in the loop.
"""
self.clockReset=True
[docs]
def syncClocks(self,t):
"""Synchronise the Bits/RTBox Clock with the host clock
Given by t.
"""
self.clockReset=True
self.win.flip()
self.clockReset=False
self.win.flip()
t.reset()
#=================================================#
# Bits Trigger and Stereo Goggle Functions #
#=================================================#
[docs]
def getPackets(self):
"""Returns the number of packets available for trigger pulses.
"""
return self._NumberPackets
[docs]
def setTrigger(self, triggers=0, onTime=0,
duration=0, mask=0xFFFF):
""" Quick way to set up triggers.
Triggers is a binary word that determines which
triggers will be turned on.
onTime specifies the start time of the trigger within
the frame (in S with 100uS resolution)
Duration specifies how long the trigger will last.
(in S with 100uS resolution).
Note that mask only protects the digital output lines
set by other activities in the Bits. Not other triggers.
Example::
bits.setTrigger(0b0000000010, 2.0, 4.0, 0b0111111111)
bits.startTrigger()
Will issue a 4ms long high-going pulse, 2ms after the start
of each frame on DOUT1 while protecting the value of DOUT 9.
"""
# Convert on time and duration into device units.
sOnTime = int(round(onTime*10000.0, 0))
sDuration = int(round(duration*10000.0, 0))
# preallocate an empty data packet
packet = [0]*self._NumberPackets
# Set some elements of the packet equal to the desired trigger pattern.
for index in range(sOnTime, int(sOnTime + sDuration) ):
packet[index] = triggers
# Set up the actual trigger headers.
self.setTriggerList(triggerList=packet, mask=mask)
[docs]
def setTriggerList(self, triggerList=None, mask=0xFFFF):
""" Sets up Trigger pulses in Bits++ using the fine grained
method that can control every trigger line at 100uS
intervals.
TriggerList should contain 1 entry for every 100uS
packet (see getPackets) the binary word in each entry
specifies which trigger line will be active during that
time slot.
Note that mask only protects the digital output lines
set by other activities in the Bits. Not other triggers.
Example::
packet = [0]*self._NumberPackets
packet[0] = 0b0000000010
bits.setTriggerList(packet)
Will sens a 100us pulse on DOUT1 at the start of the frame.
Example 2::
packet = [0]*self._NumberPackets
packet[10] = 0b0000000010
packet[20] = 0b0000000001
bits.setTriggerList(packet)
bits.startTrigger()
Will sens a 100us pulse on DOUT1 1000us after the start of the
frame and a second 100us pusle on DOUT0 2000us after the start of
the frame.
Triggers will continue until stopTrigger is called.
"""
if len(triggerList) < self._NumberPackets:
warning = ("setTriggerList: TriggerList does not "
"contain enough data")
raise AssertionError(warning)
# Constants for FE1 via D25 connector goggle settings.
bothOpen = 16
leftOpen = 0
rightOpen = 32
bothClosed = 48
if self.gogglesGo: # Force mask to include goggles
mask = (mask & 0b1111111111001111) +48
# For every item in data packet representing trigger values.
for index in range(0,self._NumberPackets):
# Select the item
trig=triggerList[index]
# Mask out the original tirggers and add in the four possible
# goggle states.
trigBothOpen = (trig & 0b1111111111001111) + bothOpen
trigLeftOpen = (trig & 0b1111111111001111) + leftOpen
trigRightOpen = (trig & 0b1111111111001111) + rightOpen
trigBothClosed = (trig & 0b1111111111001111) + bothClosed
# Set up the TLock Memory address indexes for the trigger alone
# and the trigger with all possible goggle states.
self._HEADandTrig[19+index*2,:,0] = 8 + index
self._HEADandGogLeftOpen[19+index*2,:,0] = 8 + index
self._HEADandGogRightOpen[19+index*2,:,0] = 8 + index
self._HEADandGogBothOpen[19+index*2,:,0] = 8 + index
self._HEADandGogBothClosed[19+index*2,:,0] = 8 + index
# Set the data payload within the TLock for the trigger alone.
self._HEADandTrig[19+index*2,:,1] = int(np.floor(trig / 256))
self._HEADandTrig[19+index*2,:,2] = int(np.remainder(trig,256))
# Set the data payloads for the triggers with all possible goggle states.
self._HEADandGogLeftOpen[19+index*2,:,1] = int(
np.floor(trigLeftOpen / 256))
self._HEADandGogLeftOpen[19+index*2,:,2] = int(
np.remainder(trigLeftOpen, 256))
self._HEADandGogRightOpen[19+index*2,:,1] = int(
np.floor(trigRightOpen / 256))
self._HEADandGogRightOpen[19+index*2,:,2] = int(
np.remainder(trigRightOpen, 256))
self._HEADandGogBothOpen[19+index*2,:,1] = int(
np.floor(trigBothOpen / 256))
self._HEADandGogBothOpen[19+index*2,:,2] = int(
np.remainder(trigBothOpen, 256))
self._HEADandGogBothClosed[19+index*2,:,1] = int(
np.floor(trigBothClosed / 256))
self._HEADandGogBothClosed[19+index*2,:,2] = int(
np.remainder(trigBothClosed, 256))
# Set the hardware mask in each TLock.
self._HEADandTrig[17,:,1]=int(np.floor(mask/256))
self._HEADandTrig[17,:,2]=int(np.remainder(mask,256))
self._HEADandGogLeftOpen[17,:,1]=int(np.floor(mask/256))
self._HEADandGogLeftOpen[17,:,2]=int(np.remainder(mask,256))
self._HEADandGogRightOpen[17,:,1]=int(np.floor(mask/256))
self._HEADandGogRightOpen[17,:,2]=int(np.remainder(mask,256))
self._HEADandGogBothOpen[17,:,1]=int(np.floor(mask/256))
self._HEADandGogBothOpen[17,:,2]=int(np.remainder(mask,256))
self._HEADandGogBothClosed[17,:,1]=int(np.floor(mask/256))
self._HEADandGogBothClosed[17,:,2]=int(np.remainder(mask,256))
# Turn the trigger only payload into a string
# The goggles payloads are dealt with elsewhere.
self._HEADandTrigStr = self._HEADandTrig.tostring()
# Any attempt to set the triggers should un-protect them
# since by definition the protected values are no longer valid.
self.triggerProtected = False
[docs]
def sendTrigger(self, triggers=0, onTime=0, duration=0,
mask=65535):
""" Sends a single trigger using up 1 win.flip.
The trigger will be sent on the following frame.
The triggers will continue until after the next win.flip.
Actions are always 1 frame after the request.
May do odd things if Goggles and Analog are also
in use.
Example::
bits.sendTrigger(0b0000000010, 2.0, 4.0)
bits.win.flip()
Will send a 4ms puilse on DOUT1 2ms after the start of the frame.
Due to the following win.flip() the pulse should last for 1 frame only.
Triggers will continue until stopTrigger is called.
"""
self.setTrigger(triggers,onTime,duration,mask)
self.trigger=True
self.win.flip() # Send the trigger but trigger not acted on until the next frame.
#If next winflip is late the trigger will be repeated until it is cleared by the next winflip
self.trigger=False
[docs]
def startTrigger(self):
""" Start sending triggers on the next win flip
and continue until stopped by stopTrigger
Triggers start 1 frame after the frame on which
the first trigger is sent.
Example::
bits.setTrigger(0b0000000010, 2.0, 4.0, 0b0111111111)
bits.startTrigger()
while imageOn:
#do some processing
continue
bits.stopTrigger()
bits.win.flip()
"""
# If triggers have been protected from another TLock action
# we will need to restore them first.
if self.triggerProtected:
self._restoreTrigger()
self.triggerProtected = False
self.trigger=True
[docs]
def stopTrigger(self):
""" Stop sending triggers at the next win flip
Example:
.. code-block:: python
bits.setTrigger(0b0000000010, 2.0, 4.0, 0b0111111111)
bits.startTrigger()
while imageOn:
#do some processing
continue
bits.stopTrigger()
bits.win.flip()
"""
# This is a hack to get triggers to stop on the one
# Bits++ box tested.
self.trigger=False # Needed before protecting triggers.
# Protect existing triggers. Also sets triggers to zero.
self._protectTrigger()
self.trigger=True # Needed to send the zero trigger.
self.win.flip() # Make sure zero trigger is sent.
self.trigger=False # Now turn off triggers.
self._restoreTrigger() # Recover old triggers for
# Future use
[docs]
def startGoggles(self, left = 0, right = 1):
"""Starts CRS stereo goggles. Note if you are
using FE-1 goggles you should start this before
connecting the goggles.
Left is the state of the left shutter on the
first frame to be presented 0, False or
'closed'=closed; 1, True or 'open' = open,
right is the state of the right shutter on the
first frame to be presented 0, False or
'closed'=closed; 1, True or 'open' = open
Note you can set the goggles to be both open
or both closed on the same frame.
The system will always toggle the state of
each lens so as to not damage FE-1 goggles.
Example::
bits.startGoggles(0,1)
bits.win.flip()
while not response
bits.win.flip()
#do some processing
bits.stopGoggles()
bits.win.flip()
Starts toggling the goggles with the right eye open in sync with the
first win.flip() within the loop. The open eye will alternate.
Example::
bits.startGoggles(1,1)
bits.win.flip()
while not response:
bits.win.flip()
#do some processing
bits.stopGoggles()
bits.win.flip()
Starts toggling the goggle with both eyes open in sync with the first
win.flip() within the loop. Eyes will alternate between both open and both closed.
Note it is safe to leave the goggles toggling forever, ie to never call stopGoggles().
"""
# Protect any existing trigger settings if required.
self._protectTrigger() # Also sets triggers to zero.
if left in ('closed','Closed'):
self.gogglesLeft = 0
elif left in ('open','Open'):
self.gogglesLeft = 1
else:
self.gogglesLeft = int(left)
if right in ('closed','Closed'):
self.gogglesRight = 0
elif right in ('open','Open'):
self.gogglesRight = 1
else:
self.gogglesRight = int(right)
self.gogglesGo = True
[docs]
def stopGoggles(self):
""" Stop the stereo goggles from toggling
Example::
bits.startGoggles(0,1)
bits.win.flip()
while not response:
bits.win.flip()
#do some processing
bits.stopGoggles()
bits.win.flip()
Starts toggling the goggles with the right eye open in sync with the
first win.flip(0) within the loop. The open eye will alternate.
Note it is safer to leave the goggles toggling forever, ie to never call stopGoggles().
"""
# Restore old triggers if goggles used without other triggers.
self._restoreTrigger()
self.stopTrigger() #A hack for Bits++
self.gogglesGo = False
[docs]
def reset(self):
"""Deprecated: This was used on the old Bits++
to power-cycle the box.
It required the compiled dll, which only worked
on windows and doesn't work with Bits# or Display++.
"""
reset()
#==========================================#
# Helper functions for LUTs and Triggers #
# Should not be needed by user #
#==========================================#
def _protectTrigger(self):
""" If Goggles (or analog) outputs are used when the
digital triggers are off we need to make a set of blank
triggers first. But the user might have set up triggers
in waiting for a later time. So this will protect them.
"""
# No need to do this if triggers are active anyway.
# or if they are already being protected.
if not self.trigger and not self.triggerProtected:
self._keepTrig = deepcopy(self._HEADandTrig)
self._keepGogLeftOpen = deepcopy(self._HEADandGogLeftOpen)
self._keepGogRightOpen = deepcopy(self._HEADandGogRightOpen)
self._keepGogBothOpen = deepcopy(self._HEADandGogBothOpen)
self._keepGogBothClosed = deepcopy(self._HEADandGogBothClosed)
self.setTrigger()
# Set flag to tell trigger start that it has to recover its
# trigger headers.
self.triggerProtected = True
def _restoreTrigger(self):
""" Restores the triggers to previous settings
"""
# No need to do this if triggers are running as will have been
# recovered already if required.
if not self.trigger:
self._HEADandTrig = deepcopy(self._keepTrig)
self._HEADandTrigStr = self._HEADandTrig.tostring()
self._HEADandGogLeftOpen = deepcopy(self._keepGogLeftOpen)
self._HEADandGogRightOpen = deepcopy(self._keepGogRightOpen)
self._HEADandGogBothOpen = deepcopy(self._keepGogBothOpen)
self._HEADandGogBothClosed = deepcopy(self._keepGogBothClosed)
# Set flag to tell trigger start that it has no need to recover its
# trigger headers.
self.triggerProtected = False
def _drawLUTtoScreen(self):
"""(private) Used to set the LUT in 'bits++' mode.
Should not be needed by user if attached to a
:class:`~psychopy.visual.Window` since this will automatically
draw the LUT as part of the screen refresh.
"""
# push the projection matrix and set to orthographic
GL.glMatrixMode(GL.GL_PROJECTION)
GL.glPushMatrix()
GL.glLoadIdentity()
# this also sets the 0,0 to be top-left
GL.glOrtho(0, self.win.size[0], self.win.size[1], 0, 0, 1)
# but return to modelview for rendering
GL.glMatrixMode(GL.GL_MODELVIEW)
GL.glLoadIdentity()
# draw the pixels
GL.glActiveTexture(GL.GL_TEXTURE0)
GL.glEnable(GL.GL_TEXTURE_2D)
GL.glBindTexture(GL.GL_TEXTURE_2D, 0)
GL.glActiveTexture(GL.GL_TEXTURE1)
GL.glEnable(GL.GL_TEXTURE_2D)
GL.glBindTexture(GL.GL_TEXTURE_2D, 0)
GL.glRasterPos2i(0, 1)
GL.glPixelStorei(GL.GL_UNPACK_ALIGNMENT, 1)
GL.glDrawPixels(len(self._HEADandLUT), 1,
GL.GL_RGB, GL.GL_UNSIGNED_BYTE,
self._HEADandLUTstr)
# GL.glDrawPixels(524,1, GL.GL_RGB,GL.GL_UNSIGNED_BYTE,
# self._HEADandLUTstr)
# return to 3D mode (go and pop the projection matrix)
GL.glMatrixMode(GL.GL_PROJECTION)
GL.glPopMatrix()
GL.glMatrixMode(GL.GL_MODELVIEW)
def _ResetClock(self):
"""(private) Used to reset Bits hardware clock.
Should not be needed by user if attached to a
:class:`~psychopy.visual.Window`
since this will automatically draw the
reset code as part of the screen refresh.
"""
#push the projection matrix and set to orthographic
GL.glMatrixMode(GL.GL_PROJECTION)
GL.glPushMatrix()
GL.glLoadIdentity()
GL.glOrtho( 0, self.win.size[0],self.win.size[1], 0, 0, 1 ) #this also sets the 0,0 to be top-left
#but return to modelview for rendering
GL.glMatrixMode(GL.GL_MODELVIEW)
GL.glLoadIdentity()
# unload texture
GL.glActiveTexture(GL.GL_TEXTURE0)
GL.glEnable(GL.GL_TEXTURE_2D)
GL.glBindTexture(GL.GL_TEXTURE_2D, 0)
# unload mask
GL.glActiveTexture(GL.GL_TEXTURE1)
GL.glEnable(GL.GL_TEXTURE_2D)
GL.glBindTexture(GL.GL_TEXTURE_2D, 0)
# draw the pixels
GL.glRasterPos2i(0,2)
GL.glPixelStorei(GL.GL_UNPACK_ALIGNMENT, 1)
GL.glDrawPixels(len(self._HEADandClock),1,
GL.GL_RGB,GL.GL_UNSIGNED_BYTE,
self._HEADandClockstr)
#return to 3D mode (go and pop the projection matrix)
GL.glMatrixMode( GL.GL_PROJECTION )
GL.glPopMatrix()
GL.glMatrixMode( GL.GL_MODELVIEW )
# ensures that only 1 clock reset pulse will be issed at a time
self.clockReset=False
def _drawTrigtoScreen(self, sendStr=None):
"""(private) Used to send a trigger pulse.
Should not be needed by user if attached to a
:class:`~psychopy.visual.Window`
since this will automatically draw the trigger code as
part of the screen refresh.
"""
if sendStr == None:
sendStr = self._HEADandTrigStr
#push the projection matrix and set to orthographic
GL.glMatrixMode(GL.GL_PROJECTION)
GL.glPushMatrix()
GL.glLoadIdentity()
GL.glOrtho( 0, self.win.size[0],self.win.size[1], 0, 0, 1 ) #this also sets the 0,0 to be top-left
#but return to modelview for rendering
GL.glMatrixMode(GL.GL_MODELVIEW)
GL.glLoadIdentity()
#draw the pixels
GL.glActiveTexture(GL.GL_TEXTURE0)
GL.glEnable(GL.GL_TEXTURE_2D)
GL.glBindTexture(GL.GL_TEXTURE_2D, 0)
GL.glActiveTexture(GL.GL_TEXTURE1)
GL.glEnable(GL.GL_TEXTURE_2D)
GL.glBindTexture(GL.GL_TEXTURE_2D, 0)
GL.glRasterPos2i(0,3)
GL.glPixelStorei(GL.GL_UNPACK_ALIGNMENT, 1)
GL.glDrawPixels(len(self._HEADandTrig),1,
GL.GL_RGB,GL.GL_UNSIGNED_BYTE,
sendStr)
GL.glMatrixMode( GL.GL_PROJECTION )
GL.glPopMatrix()
GL.glMatrixMode( GL.GL_MODELVIEW )
def _Goggles(self):
"""(private) Used to set control the goggles.
Should not be needed by user if attached to a
:class:`~psychopy.visual.Window`
"""
# Work out current goggles state value.
gogglesState = self.gogglesRight*2+self.gogglesLeft
# Toggle the goggle states ready for the next win flip
self.gogglesLeft = 1 - self.gogglesLeft
self.gogglesRight = 1- self.gogglesRight
# Use gogleState to load the desired goggle trigger pattern into.
# the TLock and draw this trigger.
if gogglesState == 0:
self._drawTrigtoScreen(self._HEADandGogBothOpen.tostring())
if gogglesState == 1:
self._drawTrigtoScreen(self._HEADandGogLeftOpen.tostring())
if gogglesState == 2:
self._drawTrigtoScreen(self._HEADandGogRightOpen.tostring())
if gogglesState == 3:
self._drawTrigtoScreen(self._HEADandGogBothClosed.tostring())
def _setupShaders(self):
"""creates and stores the shader programs needed for mono++ and
color++ modes
"""
if not haveShaders:
return
self._shaders = {}
shCompile = shaders.compileProgram
self._shaders['mono++'] = shCompile(shaders.vertSimple,
shaders.bitsMonoModeFrag)
self._shaders['color++'] = shCompile(shaders.vertSimple,
shaders.bitsColorModeFrag)
def _prepareFBOrender(self):
if self.mode == 'mono++':
GL.glUseProgram(self._shaders['mono++'])
elif self.mode == 'color++':
GL.glUseProgram(self._shaders['color++'])
else:
GL.glUseProgram(self.win._progFBOtoFrame)
def _finishFBOrender(self):
GL.glUseProgram(0)
def _afterFBOrender(self):
GL.glDisable(GL.GL_BLEND)
if self.mode.startswith('bits'):
self._drawLUTtoScreen()
if self.gogglesGo: # Will also send triggers if started
self._Goggles()
elif self.trigger:
self._drawTrigtoScreen()
if self.clockReset:
self._ResetClock()
GL.glEnable(GL.GL_BLEND)
[docs]
class BitsSharp(BitsPlusPlus, serialdevice.SerialDevice):
"""A class to support functions of the Bits# (and most Display++ functions
This device uses the CDC (serial port) connection to the Bits box.
To use it you must have followed the instructions from CRS Ltd. to get
your box into the CDC communication mode.
Typical usage (also see demo in Coder view demos>hardware>BitsBox ):
.. code-block:: python
from psychopy import visual
from psychopy.hardware import crs
# we need to be rendering to framebuffer
win = visual.Window([1024,768], useFBO=True)
bits = crs.BitsSharp(win, mode = 'mono++')
# You can continue using your window as normal and OpenGL shaders
# will convert the output as needed
print(bits.info)
if not bits.OK:
print('failed to connect to Bits box')
core.quit()
core.wait(0.1)
# now, you can change modes using
bits.mode = 'mono++' # 'color++', 'mono++', 'bits++', 'status'
Note that the firmware in Bits# boxes varies over time and some features of
this class may not work for all firmware versions. Also Bits# boxes can be
configured in various ways via their config.xml file so this class makes certain
assumptions about the configuration. In particular it is assumed that all
digital inputs, triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
RTBox commands that reset the key mapping have been found not to work
one some firmware
"""
name = b'CRS Bits#'
def __init__(self, win=None,
portName=None,
mode='',
checkConfigLevel=1,
gammaCorrect='hardware',
gamma=None,
noComms=False):
"""
Parameters
----------
win : a PsychoPy :class:`~psychopy.visual.Window` object, required
portName : str or int
the (virtual) serial port to which the device is
connected. If None then PsychoPy will search available
serial ports and test communication (on OSX, the first
match of `/dev/tty.usbmodemfa*` will be used and on
linux `/dev/ttyS0` will be used
mode : 'bits++', 'color++', 'mono++', 'status'
checkConfigLevel : int
Allows you to specify how much checking of the device is
done to ensure a valid identity look-up table. If you specify
one level and it fails then the check will be escalated to
the next level (e.g. if we check level 1 and find that it
fails we try to find a new LUT):
- 0 don't check at all
- 1 check that the graphics driver and OS version haven't
changed since last LUT calibration
- 2 check that the current LUT calibration still provides
identity (requires switch to status mode)
- 3 search for a new identity look-up table (requires
switch to status mode)
gammaCorrect : string
G overning how gamma correction is performed:
- 'hardware': use the gamma correction file stored on the
hardware
- 'FBO': gamma correct using shaders when rendering the FBO
to back buffer
- 'bitsMode': in bits++ mode there is a user-controlled LUT
that we can use for gamma correction
noComms : bool
If True then don't try to communicate with the device at all
(passive mode). This can be useful if you want to debug the
system without actually having a Bits# connected.
"""
# import pyglet.GL late so that we can import bits.py without it
# initially
global GL, visual
from psychopy import visual
import pyglet.gl as GL
if noComms:
self.noComms = True
self.OK = True
self.sendMessage = self._nullSendMessage
self.getResponse = self._nullGetResponse
else:
self.noComms = False
# look for device on valid serial ports
# parity="N", # 'N'one, 'E'ven, 'O'dd, 'M'ask,
serialdevice.SerialDevice.__init__(self, port=portName,
baudrate=19200,
byteSize=8, stopBits=1,
parity="N",
eol='\n',
maxAttempts=1,
pauseDuration=0.1,
checkAwake=True)
if not self.OK:
return
self.win = win
if self.noComms:
self.frameRate = self.win.getActualFrameRate()
else:
msg='a'
while msg:
msg=self.read(timeout=0.1)
self.sendMessage('$VideoFrameRate\r')
self.pause()
msg=self.read(timeout=0.1)
msg2 = msg.split(b';')
self.frameRate = float(msg2[1])
self._setHeaders(self.frameRate)
# flag for controlling analog outputs
self.analog = False
# replace window methods with our custom ones
self.win._prepareFBOrender = self._prepareFBOrender
self.win._finishFBOrender = self._finishFBOrender
self.win._afterFBOrender = self._afterFBOrender
# Bits++ doesn't do its own correction so we need to
self.gammaCorrect = gammaCorrect
self.gamma = gamma
# we have a confirmed connection. Now check details about device and
# system
if not hasattr(self, 'info'):
self.info = self.getInfo()
self.config = None
self.mode = mode
if self.win is not None:
if not hasattr(self.win, '_prepareFBOrender'):
logging.error("BitsSharp was given an object as win "
"argument but this is not a visual.Window")
self.win._prepareFBOrender = self._prepareFBOrender
self.win._finishFBOrender = self._finishFBOrender
self._setupShaders()
# now check that we have a valid configuration of the box
if checkConfigLevel:
ok = self.checkConfig(level=checkConfigLevel)
else:
self.win.gammaRamp = self.config.identityLUT
else:
self.config = None # makes no sense if we have a window?
logging.warning("%s was not given any PsychoPy win" % (self))
# members for controlling the RTBox functionality
self.RTBoxMode = ['CB6','down','trigger']
self.RTBoxEnabled = False
# Make sure that RTBox event logging is off
self.RTBoxDisable()
# members for storing RTBox button presses
self.RTButtons=[] # list of button presses
self.nRTPresses = 0 # number of button presses recorded
# members for controlling the statusBox functionality
self.statusBoxMode = ['CB6','down','trigger','analog']
self.statusBoxEnabled = False
self.statusButtonMap = [99]*23
self.statusBoxThreshold = 9999.99
# members for storing statusBox presses
self.statusButtons=[] # list of button presses
self.nStatusPresses = 0 # number of button presses recorded
# members for controlling status logging and reporting
self.statusDINBase = 0b1111111111 #initial values for ditgial ins
self.statusIRBase = 0b111111 #initial values for CB6 IR box
self.statusTrigInBase = 0 #initial values for TrigIn
self.statusADCBase = 0 #initial value for ADCs
self.statusThreshold = 9999.99 #threshold for ADC change
self.statusMode = ['up','down'] #Direction of events to be reported
self.statusEnabled = False
self._statusSize = 111
# members for storing status logs and reports
self.statusQ=Queue.Queue(70000) # sets up a queue in which to store bits status events
self.statusValues=[] # full list of values recorded while logging the Bits# status
self.status_nValues = 0 #number of status values recorded
self.statusEvents=[] # list of meaningful events extracted from log
self.status_nEvents = 0 #number of events recorded
#==============================================#
# Some overloads as Bits++ and Bits# appear to #
# function slightly differently when it comes #
# to triggers #
#==============================================#
[docs]
def stopTrigger(self):
"""Stop sending triggers at the next win flip.
Example::
bits.setTrigger(0b0000000010, 2.0, 4.0, 0b0111111111)
bits.startTrigger()
while imageOn:
#do some processing
continue
bits.stopTrigger()
bits.win.flip()
"""
# Simply stops sending TLock triggers as the next win flip.
self.trigger=False
[docs]
def stopGoggles(self):
""" Stop the stereo goggles from toggling
Example::
bits.startGoggles(0,1)
bits.win.flip()
while not response
bits.win.flip()
#do some processing
bits.stopGoggles()
bits.win.flip()
Starts toggling the goggles with the right eye open in sync with the
first win.flip(0) within the loop. The open eye will alternate.
Note it is safer to leave the goggles toggling forever, ie to never call stopGoggles().
"""
# Restore any protected triggers if required and stop sending.
# TLock for goggles at next win flip.
self._restoreTrigger()
self.gogglesGo = False
#===================================================#
# Some empty methods that we can use to replace #
# serial methods if noComms #
#===================================================#
def _nullSendMessage(self, message, autoLog=True):
pass
def _nullGetResponse(self, length=1, timeout=0.1):
pass
#================================#
# Basic functionality #
#================================#
def __del__(self):
"""If the user discards this object then close the serial port
so it is released.
"""
try:
self.com.close()
except AttributeError:
pass
[docs]
def isAwake(self):
"""Test whether we have an active connection on the virtual serial
port
"""
self.info = self.getInfo()
# if we got a productType then this is a bits device
return len(self.info['ProductType']) > 0
[docs]
def getInfo(self):
""" Returns a python dictionary of info about the Bits Sharp box
Example::
info=bits.getInfo
print(info['ProductType'])
"""
if self.noComms:
return {'ProductType': 'Bits#',
'SerialNumber': 'n/a',
'FirmwareDate': 'n/a'}
self.sendMessage(b'$Stop\r')
self.read(timeout=0.5) # clear input buffer
info = {}
# get product ('Bits_Sharp'?)
self.sendMessage(b'$ProductType\r')
time.sleep(0.1)
info['ProductType'] = self.read().replace(b'#ProductType;', b'')
info['ProductType'] = info['ProductType'].replace(b';\n\r', b'')
# get serial number
self.sendMessage(b'$SerialNumber\r')
time.sleep(0.1)
info['SerialNumber'] = self.read().replace(b'#SerialNumber;', b'')
info['SerialNumber'] = info['SerialNumber'].replace(b'\x00\n\r', b'')
# get firmware date
self.sendMessage(b'$FirmwareDate\r')
time.sleep(0.1)
info['FirmwareDate'] = self.read().replace(b'#FirmwareDate;', b'')
info['FirmwareDate'] = info['FirmwareDate'].replace(b';\n\r', b'')
return info
@property
def mode(self):
"""Get/set the mode of the BitsSharp to one of:
"bits++"
"mono++"
"color++"
"status"
"storage"
"auto"
"""
return self.__dict__['mode']
@mode.setter
def mode(self, value):
requiresFBO = 'mode requires a PsychoPy Window with useFBO=True'
if value in [None, '']:
self.__dict__['mode'] = ''
return
elif ('mode' in self.__dict__) and value == self.mode:
return # nothing to do here. Move along please
elif value == 'status':
self.sendMessage(b'$statusScreen\r')
self.__dict__['mode'] = 'status'
return
elif 'storage' in value.lower():
self.sendMessage(b'$USB_massStorage\r')
self.__dict__['mode'] = 'massStorage'
elif value.startswith('bits'):
self.sendMessage(b'$BitsPlusPlus\r')
self.__dict__['mode'] = 'bits++'
self.setLUT()
elif value.startswith('mono'):
if not self.win.useFBO:
raise Exception("Mono++ " + requiresFBO)
self.sendMessage(b'$monoPlusPlus\r')
self.__dict__['mode'] = 'mono++'
elif value.startswith('colo'):
if not self.win.useFBO:
raise Exception("Color++ " + requiresFBO)
self.sendMessage(b'$colorPlusPlus\r')
self.__dict__['mode'] = 'color++'
elif value.startswith('auto'):
if not self.win.useFBO:
raise Exception("Auto++ " + requiresFBO)
self.sendMessage(b'$autoPlusPlus\r')
self.__dict__['mode'] = 'auto++'
else:
msg = ("Bits# doesn't know how to use mode "
"%r. Should be 'mono++', 'color++' etc")
raise AttributeError(msg % value)
logging.info('Switched %s to %s mode' % (self.info['ProductType'],
self.__dict__['mode']))
[docs]
def setLUT(self, newLUT=None, gammaCorrect=False, LUTrange=1.0,
contrast=None):
"""SetLUT is only really needed for bits++ mode of bits# to set the
look-up table (256 values with 14bits each).
For the BitsPlusPlus device the default is to perform gamma
correction here but on the BitsSharp it seems better to have the
device perform that itself as the last step so gamma correction is
off here by default.
If no contrast has yet been set (it isn't needed for other modes)
then it will be set to 1 here.
"""
if contrast is not None:
# we were given a new contrast value so use it:
self.contrast = contrast
elif not hasattr(self, 'contrast'):
# we don't have one yet so create a default
self.contrast = 1.0
BitsPlusPlus.setLUT(self, newLUT, gammaCorrect, LUTrange)
@property
def temporalDithering(self):
"""Temporal dithering can be set to True or False
"""
return self.__dict__['temporalDithering']
@temporalDithering.setter
def temporalDithering(self, value):
if value:
self.sendMessage(b'$TemporalDithering=[ON]\r')
else:
self.sendMessage(b'$TemporalDithering=[OFF]\r')
self.__dict__['temporalDithering'] = value
@property
def gammaCorrectFile(self):
"""Get / set the gamma correction file to be used
(as stored on the device)
"""
return self.__dict__['gammaCorrectFile']
@gammaCorrectFile.setter
def gammaCorrectFile(self, value):
self.sendMessage(b'$enableGammaCorrection=[%s]\r' % (value))
self.__dict__['gammaCorrectFile'] = value
@property
def monitorEDID(self):
"""Get / set the EDID file for the monitor.
The edid files will be located in the EDID subdirectory of the
flash disk. The file `automatic.edid` will be the file read from
the connected monitor.
"""
return self.__dict__['monitorEDID']
@monitorEDID.setter
def monitorEDID(self, value):
self.sendMessage(b'$setMonitorType=[%s]\r' % (value))
self.__dict__['monitorEDID'] = value
[docs]
def beep(self, freq=800, dur=1):
"""Make a beep of a given frequency and duration
"""
self.sendMessage(b'$Beep=[%i, %.4f]\r' % (freq, dur))
[docs]
def getVideoLine(self, lineN, nPixels, timeout=10.0, nAttempts=10):
"""Return the r,g,b values for a number of pixels on a particular
video line
:param lineN: the line number you want to read
:param nPixels: the number of pixels you want to read
:param nAttempts: the first time you call this function it has
to get to status mode. In this case it sometimes takes a few
attempts to make the call work
:return: an Nx3 numpy array of uint8 values
"""
# define sub-function oneAttempt
def oneAttempt():
self.com.flushInput()
self.sendMessage(b'$GetVideoLine=[%i, %i]\r' % (lineN, nPixels))
# the box implicitly ends up in status mode
self.__dict__['mode'] = 'status'
# prepare to read
t0 = time.time()
raw = ""
vals = []
while len(vals) < (nPixels * 3):
raw += self.read(timeout=0.001).decode("utf-8")
vals = raw.split(';')[1:-1]
if time.time() - t0 > timeout:
msg = ("getVideoLine() timed out: only found %i pixels"
" in %.2f s")
logging.warn(msg % (len(vals), timeout))
return []
return np.array(vals, dtype=int).reshape([-1, 3])
# call oneAttempt a few times
for attempt in range(nAttempts):
vals = oneAttempt()
if len(vals):
return vals
return None
#==============================================================#
# Bits# and Display++ comms functions #
#==============================================================#
[docs]
def read(self, timeout=0.1):
"""Get the current waiting characters from the serial port
if there are any.
Mostly used internally but may be needed by user. Note the
return message depends on what state the device is in and will
need to be decoded. See the Bits# manual but also the other functions
herein that do the decoding for you.
Example:
message = bits.read()
"""
if self.noComms:
return
self.com.timeout = timeout
nChars = self._inWaiting()
raw = self.com.read(nChars)
if raw:
# don't bother if we found nothing on input
logging.debug("Got BitsSharp reply: %s" % (repr(raw)))
return raw
[docs]
def flush(self):
""" Flushes the serial input buffer
Its good to do this before and after data collection,
And generally quite often.
"""
while self._inWaiting()>0:
msg=self.read(0.001)
#=============================================================#
# Helper functions for comms #
#=============================================================#
def _inWaiting(self):
"""Helper function to determine how many bytes are waiting on
the serial port.
"""
if self.noComms:
return 0
else:
return self.com.inWaiting()
#=============================================================#
# overload of _afterFBOrender for Bits# and Display++ #
#=============================================================#
def _afterFBOrender(self):
GL.glDisable(GL.GL_BLEND)
if self.mode.startswith('bits'):
self._drawLUTtoScreen()
if self.gogglesGo: # Will also send triggers if requested
self._Goggles()
elif self.analog or self.trigger:
self._drawTrigtoScreen()
if self.clockReset:
self._ResetClock()
GL.glEnable(GL.GL_BLEND)
#====================================================#
# Analog functions send voltages via the DAC outputs #
# But we also need to overload the setTrigger #
# functions to protect the analog settings. #
#====================================================#
[docs]
def setTrigger(self, triggers=0, onTime=0, duration=0,
mask=0xFFFF):
""" Quick way to set up triggers.
Triggers is a binary word that determines which
triggers will be turned on.
onTime specifies the start time of the trigger within
the frame (in S with 100uS resolution)
Duration specifies how long the trigger will last.
(in S with 100uS resolution).
Note that mask only protects the digital output lines
set by other activities in the Bits. Not other triggers.
Example:
```
bits.setTrigger(0b0000000010, 2.0, 4.0, 0b0111111111)
bits.startTrigger()
```
Will issue a 4ms long high-going pulse, 2ms after the start
of each frame on DOUT1 while protecting the value of DOUT 9.
"""
# Protect the analog output settings.
self._protectAnalog()
super(BitsSharp, self).setTrigger(triggers,
onTime,
duration,
mask)
# Restore the analog output settings.
self._restoreAnalog
[docs]
def setTriggerList(self, triggerList=None, mask=0xFFFF):
"""Overaload of Bits# and Display++
Sets up Trigger pulses via the list method
while preserving the analog output settings.
Sets up Trigger pulses in Bist++ using the fine grained
method that can control every trigger line at 100uS
intervals.
TriggerList should contain 1 entry for every 100uS
packet (see getPackets) the binary word in each entry
specifies which trigger line will be active during that
time slot.
Note that mask only protects the digital output lines
set by other activities in the Bits. Not other triggers.
Example:
packet = [0]*self._NumberPackets
packet[0] = 0b0000000010
bits.setTriggerList(packet)
Will sens a 100us pulse on DOUT1 at the start of the frame.
Example 2:
packet = [0]*self._NumberPackets
packet[10] = 0b0000000010
packet[20] = 0b0000000001
bits.setTriggerList(packet)
bits.statrtTrigger()
Will sens a 100us pulse on DOUT1 1000us after the start of the
frame and a second 100us pusle on DOUT0 2000us after the start of
the frame.
Triggers will continue until stopTrigger is called.
"""
# Protect the analog output settings.
self._protectAnalog()
super(BitsSharp, self).setTriggerList(triggerList,
mask)
# Restore the analog output settings
self._restoreAnalog
[docs]
def setAnalog(self,AOUT1=0, AOUT2=0):
"""Sets up Analog outputs in Bits#
AOUT1 and AOUT2 are the two analog values required in volts.
Analog commands are issued at the next win.flip() and
actioned 1 video frame later.
Example:
bits.set Analog(4.5,-2.2)
bits.startAnalog()
bits.win.flip()
"""
# Convert AOUT1 from volts to device units.
AOUT1 = int(np.round(32767.0*AOUT1/5.0,0))
if AOUT1 < 0:
AOUT1 = 65535 + AOUT1
# Add analog values to the Tlock payloads
self._HEADandTrig[11,:,1] = int(np.floor(AOUT1 / 256))
self._HEADandTrig[11,:,2] = np.remainder(AOUT1, 256)
self._HEADandGogLeftOpen[11,:,1] = int(np.floor(AOUT1 / 256))
self._HEADandGogLeftOpen[11,:,2] = np.remainder(AOUT1, 256)
self._HEADandGogRightOpen[11,:,1] = int(np.floor(AOUT1 / 256))
self._HEADandGogRightOpen[11,:,2] = np.remainder(AOUT1, 256)
self._HEADandGogBothOpen[11,:,1] = int(np.floor(AOUT1 / 256))
self._HEADandGogBothOpen[11,:,2] = np.remainder(AOUT1, 256)
self._HEADandGogBothClosed[11,:,1] = int(np.floor(AOUT1 / 256))
self._HEADandGogBothClosed[11,:,2] = np.remainder(AOUT1, 256)
# Convert AOUT2 from volts to device units.
AOUT2 = int(np.round(32767.0*AOUT2 / 5.0, 0))
if AOUT2 < 0:
AOUT2 = 65535 + AOUT2
# Add analog values to the Tlock payloads
self._HEADandTrig[13,:,1] = int(np.floor(AOUT2 / 256))
self._HEADandTrig[13,:,2] = np.remainder(AOUT2, 256)
self._HEADandGogLeftOpen[13,:,1] = int(np.floor(AOUT2 / 256))
self._HEADandGogLeftOpen[13,:,2] = np.remainder(AOUT2, 256)
self._HEADandGogRightOpen[13,:,1] = int(np.floor(AOUT2 / 256))
self._HEADandGogRightOpen[13,:,2] = np.remainder(AOUT2, 256)
self._HEADandGogBothOpen[13,:,1] = int(np.floor(AOUT2 / 256))
self._HEADandGogBothOpen[13,:,2] = np.remainder(AOUT2,256)
self._HEADandGogBothClosed[13,:,1] = int(np.floor(AOUT2 / 256))
self._HEADandGogBothClosed[13,:,2] = np.remainder(AOUT2, 256)
# Convert the trigger only payload to a string.
# goggle payloads are dealt with later.
self._HEADandTrigStr = self._HEADandTrig.tostring()
[docs]
def sendAnalog(self,AOUT1 = 0, AOUT2 = 0):
"""sends a single analog output pulse uses up 1 win flip.
pulse will continue until next win flip called.
Actions are always 1 frame behind the request.
May conflict with trigger and goggle settings.
Example:
bits.sendAnalog(4.5,-2.0)
bits.win.flip()
"""
if not self.trigger: # If digital triggers are not on.
self._protectTrigger() # Protect any trigger settings.
# and set up a blank trigger
# Goggle setting always preserved.
self.setAnalog(AOUT1 = AOUT1, AOUT2 = AOUT2)
self.analog=True
self.win.flip() # Send the pulse but not acted on until the next frame.
# If next winflip is late the trigger will be repeated until
# it is cleared by the next winflip
self.analog=False # Cancel and future analogf outs.
self._restoreTrigger() # Restore the old digital triggers.
[docs]
def startAnalog(self):
"""will start sending analog signals on the
next win flip and continue until stopped.
Example:
bits.set Analog(4.5,-2.2)
bits.startAnalog()
bits.win.flip()
"""
if not self.trigger: # If digital triggers are not on.
self._protectTrigger() # Protect any trigger settings.
# and set up a blank trigger
# Goggle setting always preserved.
self.analog=True
[docs]
def stopAnalog(self):
"""will stop sending analogs signals at the next win flip.
Example::
bits.set Analog(4.5,-2.2)
bits.startAnalog()
bits.win.flip()
while not response:
#do some processing.
bits.win.flip()
bits.stopAnalog()
bits.win.flip()
"""
# Restore any protected triggers if required and stop sending.
# TLock for analog at next win flip.
self._restoreTrigger()
self.analog=False
#============================================================#
# Helper functions for protecting the analog settings from #
# bits++ trigger functions #
#============================================================#
def _protectAnalog(self):
self.keepA1MS = self._HEADandTrig[11,:,1]
self.keepA1LS = self._HEADandTrig[11,:,2]
self.keepA2MS = self._HEADandTrig[13,:,1]
self.keepA2LS = self._HEADandTrig[13,:,2]
def _restoreAnalog(self):
self._HEADandGogLeftOpen[11,:,1] = self.keepA1MS
self._HEADandGogLeftOpen[11,:,2] = self.keepA1LS
self._HEADandGogLeftOpen[13,:,1] = self.keepA2MS
self._HEADandGogLeftOpen[13,:,2] = self.keepA2LS
self._HEADandGogRightOpen[11,:,1] = self.keepA1MS
self._HEADandGogRightOpen[11,:,2] = self.keepA1LS
self._HEADandGogRightOpen[13,:,1] = self.keepA2MS
self._HEADandGogRightOpen[13,:,2] = self.keepA2LS
self._HEADandGogBothOpen[11,:,1] = self.keepA1MS
self._HEADandGogBothOpen[11,:,2] = self.keepA1LS
self._HEADandGogBothOpen[13,:,1] = self.keepA2MS
self._HEADandGogBothOpen[13,:,2] = self.keepA2LS
self._HEADandGogBothClosed[11,:,1] = self.keepA1MS
self._HEADandGogBothClosed[11,:,2] = self.keepA1LS
self._HEADandGogBothClosed[13,:,1] = self.keepA2MS
self._HEADandGogBothClosed[13,:,2] = self.keepA2LS
self._HEADandTrig[11,:,1] = self.keepA1MS
self._HEADandTrig[11,:,2] = self.keepA1LS
self._HEADandTrig[13,:,1] = self.keepA2MS
self._HEADandTrig[13,:,2] = self.keepA2LS
self._HEADandTrigStr = self._HEADandTrig.tostring()
#============================================================#
# RTBox. Bits# and Display++ can mimic a RT Box physical #
# Reaction time response box #
# see https://lobes.osu.edu/rt-box.php #
# These RTBox functions use the RTBox comms format to #
# read button press and trigger events #
#============================================================#
[docs]
def RTBoxClear(self):
""" Flushes the serial input buffer.
Its good to do this before and after data collection.
This just calls flush() so is a wrapper for RTBox.
"""
self.flush()
[docs]
def setRTBoxMode(self, mode=['CB6','Down','Trigger']):
""" Sets the RTBox mode data member - does not
actually set the RTBox into this mode.
Example:
bits.setRTBoxMode(['CB6','Down']) # set the mode
bits.RTBoxEnable() # Enable RTBox emulation with
# the preset mode.
sets the RTBox mode settings for a CRS CB6 button box.
and for detection of 'Down' events only.
"""
self.RTBoxMode = mode
[docs]
def RTBoxEnable(self, mode=None, map=None):
""" Sets up the RTBox with preset or bespoke mappings
and enables event detection.
RTBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, tigers and
CB6 IR input channels.
Mode is a list of strings.
Preset mappings provided via mode:
* `CB6` for the CRS CB6 IR response box.
* `IO` for a three button box connected to Din0-2
* `IO6` for a six button box connected to Din0-5
If mode = None or is not set then the value of self.RTBoxMode is used.
Bespoke Mappings over write preset ones.
The format for map is a list of tuples with each tuple
containing the name of the
RT Box button to be mapped and its source
eg ('btn1','Din0') maps physical input Din0 to
logical button btn1.
Note the lowest number button event is Btn1
RTBox has four logical buttons (btn1-4) and
three auxiliary events (light, pulse and trigger)
Buttons/events can be mapped to multiple physical
inputs and stay mapped until reset.
Mode is a list of string or list of strings that contains
keywords to determine present mappings and modes for RTBox.
* If mode includes 'Down' button events will be detected when pressed.
* If mode includes 'Up' button events will be detected when released.
You can detect both types of event but note that pulse, light and trigger events
don't have an 'Up' mode.
If Trigger is included in mode the trigger event will be mapped to the trigIn connector.
Example:
.. code-block:: python
bits.RTBoxEnable(mode = ['Down']), map = [('btn1','Din0'), ('btn2','Din1')]
enables the RTBox emulation to detect Down events on buttons 1 and 2 where they are
mapped to DIN0 and DIN1.
Example:
.. code-block:: python
bits.RTBoxEnable(mode = ['Down','CB6'])
enables the RTBox emulation to detect Down events on the standard CB6 IR response box keys.
If no key direction has been set (mode does not contain 'Up' or 'Down') the default is 'Down'.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
The ability to reset keys mappings has been found not to work
on some Bits# firmware.
"""
if self.noComms:
return
if self.statusEnabled:
warning = ("Cannot use RTBox status logging "
" is on")
raise AssertionError(warning)
if self.statusBoxEnabled:
warning = ("Cannot use RTBox when statusBox is on ")
raise AssertionError(warning)
# If mode is not None then use the supplied Mode otherwise use the preset mode.
if mode != None:
self.RTBoxMode = mode
# If self.RTBoxMode has still not be set to something set it to the default: 'down','CB6','trigger'
if self.RTBoxMode == None:
self.setRTBoxMode
# If 'Up' is not in mode adds a 'Down' just in case no direction has been set at all.
if (('Up' not in self.RTBoxMode)
and ('up' not in self.RTBoxMode)):
self.RTBoxMode.append('Down')
self.RTBoxResetKeys() # reset all the button - input mappings.
# If map is not None the mapping provided will over ride any presets given
# by mode, otherwise use the present mappings below.
if map is None:
if 'CB6' in self.RTBoxMode:
self.sendMessage(b'$btn1 =[IRButtonA]\r')
self.sendMessage(b'$btn2 =[IRButtonB]\r')
self.sendMessage(b'$btn3 =[IRButtonC]\r')
self.sendMessage(b'$btn4 =[IRButtonD]\r')
self.sendMessage(b'$light =[IRButtonE]\r')
self.sendMessage(b'$pulse =[IRButtonF]\r')
# will now add pulse and light to mode
# as required to read more than 4 inputs.
self.RTBoxMode.append('Pulse')
self.RTBoxMode.append('Light')
# For a 3 or 6 button box wired to the DINs
if 'IO' in self.RTBoxMode:
self.sendMessage(b'$btn1 =[Din0]\r')
self.sendMessage(b'$btn2 =[Din1]\r')
self.sendMessage(b'$btn3 =[Din2]\r')
# Add 3 more buttons wired to the DINs
if 'IO6' in self.RTBoxMode:
self.sendMessage(b'$btn4 =[Din3]\r')
self.sendMessage(b'$light =[Din4]\r')
self.sendMessage(b'$pulse =[Din5]\r')
# will now add pulse and light to mode.
# as required to read more than 4 inputs.
self.RTBoxMode.append('Pulse')
self.RTBoxMode.append('Light')
# Add the TrigIn input as a possible event.
if (('Trigger' in self.RTBoxMode)
or ('trigger' in self.RTBoxMode)):
self.sendMessage(b'$trigger =[TrigIn]\r')
else:
self.RTBoxSetKeys(map) # Set up a bespoke mapping
self.sendMessage('X') # Advanced mode turns timestamping on
time.sleep(0.1)
msg=self.read(0.1)
# Assumes that BitsSharp and Display++ return BOX as part of
# their ID in respect of RTBox style commands.
if b'BOX' in msg:
smsg=msg.split(b',')
self.RTBoxTimeBase = float(smsg[1]) # Get the timebase for timestamps
logging.debug("Put RTBox into advanced mode: box ID = %s" %(msg))
else:
raise Exception("Cannot get RTBox into "
"advanced mode - this is needed for timestamping")
if (('Down' in self.RTBoxMode)
or ('down' in self.RTBoxMode)):
self.sendMessage('D')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'D':
logging.debug("Put RTBox into key down mode")
else:
raise Exception("Cannot get RTBox into key down mode")
if (('Up' in self.RTBoxMode)
or ('up' in self.RTBoxMode)):
self.sendMessage('U')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'U':
logging.debug("Put RTBox into key up mode")
else:
raise Exception("Cannot get RTBox into key up mode")
if (('Trigger' in self.RTBoxMode)
or ('trigger' in self.RTBoxMode)):
self.sendMessage('F')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'F':
logging.debug("Put RTBox into TR mode")
else:
raise Exception("Cannot get RTBox into TR mode")
if (('Light' in self.RTBoxMode)
or ('light' in self.RTBoxMode)):
self.sendMessage('O')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'O':
logging.debug("Put RTBox into Light mode")
else:
raise Exception("Cannot get RTBox into Light mode")
if (('Pulse' in self.RTBoxMode)
or ('pulse' in self.RTBoxMode)):
self.sendMessage('P')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'P':
logging.debug("Put RTBox into Pulse mode")
else:
raise Exception("Cannot get RTBox into Pulse mode")
self.RTBoxEnabled = True
logging.debug("RTBox emulation enabled")
self.RTBoxClear()
[docs]
def RTBoxDisable(self):
""" Disables the detection of RTBox events.
This is useful to stop the Bits# from reporting key presses
When you no longer need them. Nad must be done before using any
other data logging methods.
It undoes any button - input mappings.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
The ability to reset keys mappings has been found not to work
on some Bits# firmware.
"""
if self.noComms:
self.RTBoxEnabled = False
return
self.flush()
if (('Down' in self.RTBoxMode)
or ('down' in self.RTBoxMode)):
self.sendMessage('d')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'd':
logging.debug("Take RTBox out of key down mode")
else:
raise Exception("Cannot take RTBox out of key down mode")
if (('Up' in self.RTBoxMode)
or ('up' in self.RTBoxMode)):
self.sendMessage('u')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'u':
logging.debug("Take RTBox out of key up mode")
else:
raise Exception("Cannot take RTBox out of key up mode")
if (('Trigger' in self.RTBoxMode)
or ('trigger' in self.RTBoxMode)):
self.sendMessage('f')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'f':
logging.debug("Take RTBox out of TR mode")
else:
raise Exception("Cannot take RTBox out of TR mode")
if (('Light' in self.RTBoxMode)
or ('light' in self.RTBoxMode)):
self.sendMessage('o')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'o':
logging.debug("Take RTBox out of Light mode")
else:
raise Exception("Cannot take RTBox out of Light mode")
if (('Pulse' in self.RTBoxMode)
or ('pulse' in self.RTBoxMode)):
self.sendMessage('p')
time.sleep(0.1)
msg = self.read(0.1)
if msg == b'p':
logging.debug("Take RTBox out of Pulse mode")
else:
raise Exception("Cannot take RTBox out of Pulse mode")
self.RTBoxClear()
#Reset the button mappings
self.RTBoxResetKeys()
self.RTBoxClear()
self.RTBoxEnabled = False
logging.debug("RTBox emulation disabled")
[docs]
def RTBoxResetKeys(self):
""" Resets the key mappings to no mapping.
Has the effect of disabling RTBox input.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
The ability to reset keys mappings has been found not to work
on some Bits# firmware.
"""
if self.noComms:
return
self.sendMessage(b'$btn1 =[null]\r')
self.sendMessage(b'$btn2 =[null]\r')
self.sendMessage(b'$btn3 =[null]\r')
self.sendMessage(b'$btn4 =[null]\r')
self.sendMessage(b'$light =[null]\r')
self.sendMessage(b'$pulse =[null]\r')
self.sendMessage(b'$trigger =[null]\r')
logging.debug("All RTBOX buttons disconnected")
[docs]
def RTBoxSetKeys(self,map):
""" Set key mappings: first resets existing then adds new ones.
Does not reset any event that is not in the new list.
RTBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, triggers and CB6 IR input channels.
The format for map is a list of tuples with each tuple containing the name of the
RTBox button to be mapped and its source eg ('btn1','Din1') maps physical input Din1 to
logical button btn1.
RTBox has four logical buttons (btn1-4) and three auxiliary events (light, pulse and trigger)
Buttons/events can be mapped to multiple physical inputs and stay mapped until reset.
Example:
bits.RTBoxSetKeys([('btn1','Din0),('light','Din9')])
Will link Din0 to button 1 and Din9 to the the light input emulation.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
if self.noComms:
return
for mapping in map:
# reset the mapping.
str = '$'+mapping[0]+'=[null]\r'
self.sendMessage(str)
# Construct and send new mapping string
str = '$'+mapping[0]+'=['+mapping[1]+']\r'
self.sendMessage(str)
[docs]
def RTBoxAddKeys(self,map):
""" Add key mappings to an existing map.
RTBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, triggers and CB6 IR input channels.
The format for map is a list of tuples with each tuple containing the name of the
RTBox button to be mapped and its source eg ('btn1','Din1') maps physical input Din1 to
logical button btn1.
RTBox has four logical buttons (btn1-4) and three auxiliary events (light, pulse and trigger)
Buttons/events can be mapped to multiple physical inputs and stay mapped until reset.
Example::
bits.RTBoxSetKeys([('btn1','Din0),('btn2','Din1')])
bits.RTBoxAddKeys([('btn1','IRButtonA'),(('btn2','IRButtonB')])
Will link Din0 to button 1 and Din1 to button 2.
Then adds IRButtonA and IRButtonB alongside the original mappings.
Now both hard wired and IR inputs will - emulating the same logical button press.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
if self.noComms:
return
for mapping in map:
# Construct and send new mapping string
str = '$'+mapping[0]+'=['+mapping[1]+']\r'
self.sendMessage(str)
[docs]
def RTBoxCalibrate(self,N=1):
""" Used to assess error between host clock and Bits#
button press time stamps.
Prints each sample provided and returns the mean error.
The clock willnever be completely in sync but the aim is that there
should be that the difference between them should not grow over
a serise of button presses.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
print("Calibrating Bits Button box: Press button %d times" %(N))
drift=0;
HostClock=core.Clock()
# Sync the 2 clocks as best they can be by resetting them both.
self.syncClocks(HostClock)
# Measure the difference between the stime stamps provided by N
# Button presses and the host clock equivalents.
for sample in range(0, N):
msg = self.RTBoxWait()
if msg:
tH = HostClock.getTime()
tB = msg.time
drift = drift+tH-tB
print (sample, tB, tH, tH-tB)
else:
drift = 0
return drift/N
[docs]
def getRTBoxResponses(self, N=1):
""" checks for (at least) an appropriate number of
RTBox style key presses on the input buffer then reads them.
Returns a list of dict like objects with three members
'button', 'dir' and 'time'
'button' is a number from 1 to 9 to indicate the event that
was detected.
1-4 are the 'btn1-btn4' events, 5 and 6 are the
'light' and 'pulse' events,
7 is the 'trigger' event,
9 is a requested timestamp event (see Clock()).
'dir' is the direction of the event eg 'up' or 'down',
trigger is described as 'on' when low.
'dir' is set to 'time' if a requested
timestamp event has been detected.
'time' is the timestamp associated with the event.
Values can be read as a list of structures eg::
res = getRTBoxResponses(3)
res[0].dir, res[0].button, res[0].time
or dictionaries::
res[0]['dir'], res[0]['button'], res[0]['time']
Note even if only 1 key press was requested
a list of dict / objects is returned.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
if self.noComms:
return
if self._inWaiting() > (N*7-1):
msg = self.read()
return self._RTBoxDecodeResponse(msg,N)
else:
return
[docs]
def getRTBoxResponse(self):
""" checks for one RTBox style key presses on the input
buffer then reads it.
Returns a dict like object with three members
'button', 'dir' and 'time'
'button' is a number from 1 to 9 to indicate the event that
was detected.
1-4 are the 'btn1-btn4' events, 5 and 6 are the
'light' and 'pulse' events,
7 is the 'trigger' event,
9 is a requested timestamp event (see Clock()).
'dir' is the direction of the event eg 'up' or 'down',
trigger is described as 'on' when low.
'dir' is set to 'time' if a requested
timestamp event has been detected.
'time' is the timestamp associated with the event.
Value can be read as a structure, eg:
res= getRTBoxResponse()
res.dir, res.button, res.time
or dictionary
res['dir'], res['button'], res['time']
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
self.getRTBoxResponses(1)
if self.nRTPresses>0:
op=self.RTButtons[0]
return op
else:
return
[docs]
def getAllRTBoxResponses(self):
""" Read all of the RTBox style key presses on the
input buffer.
Returns a list of dict like objects with three members
'button', 'dir' and 'time'
'button' is a number from 1 to 9 to indicate the event that
was detected.
1-4 are the 'btn1-btn4' events, 5 and 6 are the
'light' and 'pulse' events,
7 is the 'trigger' event,
9 is a requested timestamp event (see Clock()).
'dir' is the direction of the event eg 'up' or 'down',
trigger is described as 'on' when low.
'dir' is set to 'time' if a requested
timestamp event has been detected.
'time' is the timestamp associated with the event.
Values can be read as a structure eg::
res = getAllRTBoxResponses()
res[0].dir, res[0].button, res[0].time
or dictionary::
res[0]['dir'], res[0]['button'], res[0]['time']
Note even if only 1 key press was found
a list of dict / objects is returned
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
N=int(self._inWaiting()/7)
return self.getRTBoxResponses(N)
[docs]
def RTBoxKeysPressed(self,N=1):
"""Check to see if (at least) the appropriate number
of RTBox style key presses have been made.
Example:
bits.RTBoxKeysPressed(5)
will return false until 5 button presses have been recorded.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
#7 is number of bytes for one press
if self._inWaiting() > 7*N - 1:
return True
else:
return False
[docs]
def RTBoxWaitN(self, N=1):
"""Waits until (at least) the appropriate number
of RTBox style key presses have been made
Pauses program execution in mean time.
Example:
res = bits.RTBoxWaitN(5)
will suspend all other activity until 5 button presses have been
recorded and will then return a list of Dicts containing the 5 results.
Results can be accessed as follows:
structure
res[0].dir, res[0].button, res[0].time
or dictionary
res[0]['dir'], res[0]['button'], res[0]['time']
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
if self.noComms:
return
while self._inWaiting() < 7*N:
continue # ie loop
return self.getRTBoxResponses(N)
[docs]
def RTBoxWait(self):
"""Waits until (at least) one
of RTBox style key presses have been made
Pauses program execution in mean time.
Example:
res = bits.RTBoxWait()
will suspend all other activity until 1 button press has been
recorded and will then return a dict / structure containing results.
Results can be accessed as follows:
structure
res.dir, res.button, res.time
or dictionary
res['dir'], res['button'], res['time']
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. Such variations may affect key mappings for RTBox
commands.
"""
if self.noComms:
return
while self._inWaiting() < 7:
continue # ie loop
return self.getRTBoxResponse()
[docs]
def clock(self):
""" Reads the internal clock of the Bits box via the RTBox
fortmat but note there will be a delay in reading the value
back. The fortmat for the return values is the same as for
button box presses. The return value for button will be 9 and
the return value for event will be time. The return value for
time will be the time of the clock at the moment of the request.
Example:
res = bits.clock()
print(res.time)
print(res['time'])
"""
self.RTBoxClear()
self.sendMessage('Y')
return self.RTBoxWait()
# Helper function for RTBox commands #
def _RTBoxDecodeResponse(self,msg,N=1):
""" Helper function for decoding key presses in the
RT response box format.
Not normally needed by user
"""
self.RTButtons = [button() for i in range(N)]
self.nRTPresses = N
EV=0
for index in range(0,N):
t=0
# The RTBox serial input format is quite primitive
# This is a hack to make work in different versions of Python
# There may be a better way!
if sys.version_info[0] == 3:
for i in range(index*7 + 1,index*7 + 7):
# i is ofset by index*7 so this is subtracted from 7 in line
# below and then reinstated by adding index*7 back in.
t = t+ord(chr(msg[i])) * 256**(7-i + index*7 -1)
t = t / self.RTBoxTimeBase
if chr(msg[0]) == 'Y':
event = 9
else:
event = ord(chr(msg[index*7]))
else:
raise AssertionError("Bits# RTBox Only tested for PY3")
Direction = 'None'
EV=99
# Decode event bytes into a button number and a direction.
# Calls to read the clock are a special case.
if event == 9:
Direction = 'time'
EV = 9
# Odd numbered events 49 - 55 are the down events for the 4 btn inputs
# Decoded to button = 1 -4 and direction = down.
if event == 49:
Direction = 'down'
EV = 1
if event == 51:
Direction = 'down'
EV = 2
if event == 53:
Direction = 'down'
EV = 3
if event == 55:
Direction = 'down'
EV = 4
# Even numbered events 50 - 56 are the down events for the 4 btn inputs
# Decoded to button = 1 -4 and direction = up.
if event == 50:
Direction = 'up'
EV = 1
if event == 52 :
Direction = 'up'
EV = 2
if event == 54:
Direction = 'up'
EV= 3
if event == 56:
Direction = 'up'
EV = 4
# Event 48 is the pulse event - down only
# Decoded to button = 5, direction = down
if event == 48:
Direction = 'down'
EV = 5
# Event 57 is the light event - down only
# Decoded to button = 6, direction = down
if event == 57:
Direction = 'down'
EV= 6
# Event 97 is the Trigger event
# Decoded to button = 7, direction = on
if event == 97:
Direction = 'on'
EV = 7
self.RTButtons[index].dir = Direction
self.RTButtons[index].button = EV
self.RTButtons[index].time = t
return self.RTButtons
#====================================================================#
# 'statusBox' functions use BitsSharp status reporting to #
# emulate a button box. #
#====================================================================#
[docs]
def setStatusBoxMode(self, mode=['CB6','Down','Trigger','Analog']):
""" Sets the statusBox mode data member - does not
actually set the statusBox into this mode.
Example:
bits.setStatusBoxMode(['CB6','Down']) # set the mode
bits.statusBoxEnable() # Enable status Box emulation with
# the preset mode.
sets the statusBox mode settings for a CRS CB6 button box.
and for detection of 'Down' events only.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
self.statusBoxMode = mode
[docs]
def setStatusBoxThreshold(self, threshold=None):
""" Sets the threshold by which analog inputs must change to trigger a button
press event. If None the threshold will be set very high so that no such events are
triggered.
Can be used to change the threshold for analog events without having to re enable the
status box system as a whole.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if threshold != None:
self.statusBoxThreshold = threshold
else:
self.statusBoxThreshold = 9999.99 # set impossibly high to block events.
[docs]
def statusBoxEnable(self, mode=None, map=None, threshold=None):
""" Sets up the stautsBox with preset or bespoke mappings
and enables event detection.
stautsBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, tigers and
CB6 IR input channels.
mode is a list of strings.
Preset mappings provided via mode:
- CB6 for the CRS CB6 IR response box connected mapped to btn1-6
- IO for a three button box connected to Din0-2 mapped to btn1-3
- IO6 for a six button box connected to Din0-5 mapped to btn1-6
- IO10 for a ten button box connected to Din0-9 mapped to btn1-10
- Trigger maps the trigIn to btn17
- Analog maps the 6 analog inputs on a Bits# to btn18-23
If CB6 and IOx are used together the Dins are mapped from btn7 onwards.
If mode = None or is not set then the value of self.statusBoxMode is used.
Bespoke Mappings overwrite preset ones.
The format for map is a list of tuples with each tuple
containing the name of the
button to be mapped and its source
eg ('btn1','Din0') maps physical input Din0 to
logical button btn1.
Note the lowest number button event is Btn1
statusBox has 23 logical buttons (btn1-123).
Buttons/events can be mapped to multiple physical
inputs and stay mapped until reset.
mode is a string or list of strings that contains
keywords to determine present mappings and modes for statusBox.
If mode includes 'Down' button events will be
detected when pressed.
If mode includes 'Up' button events will be
detected when released.
You can detect both types of event noting that the event detector
will look for transitions and ignorewhat it sees as the starting state.
To match with the CRS hardware description inputs are labelled as follows.
TrigIn, Din0 ... Din9, IRButtonA ... IRButtonF, AnalogIn1 ... AnalogIn6
Logical buttons are numbered from 1 to 23.
threshold sets the threshold by which analog inputs must change to trigger a button
press event. If None the threshold will be set very high so that no such events are
triggered. Analog inputs must cycle up and down by threshold to be detected as separate
events. So if only 'Up' events are detected the input must go up by threshold, then come
down again and then go back up to register 2 up events.
Example::
bits.statusBoxEnable(mode = 'Down'), map = [('btn1','Din0'), ('btn2','Din1')]
enables the stautsBox to detect Down events on buttons 1 and 2 where they are
mapped to DIN0 and DIN1.
Example::
bits.statusBoxEnable(mode = ['Down','CB6'])
enables the status Box emulation to detect Down events on the standard CB6 IR response box keys.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if self.noComms:
return
if self.statusEnabled:
warning = ("Cannot use statusBox when status logging "
" is on")
raise AssertionError(warning)
if self.RTBoxEnabled:
warning = ("Cannot use statusBox when RTBox is on ")
raise AssertionError(warning)
# If threshold is not None use the supplied value as the threshold for analog events.
if threshold != None:
self.statusBoxThreshold = threshold
else:
self.statusBoxThreshold = 9999.99 # set impossibly high to block events.
# If mode is not None then use the supplied Mode otherwise use the preset or default mode.
if mode != None:
self.statusBoxMode = mode
self.statusBoxResetKeys() # reset all the button - input mappings.
# If map is not None the mapping provided will over ride any presets given
# by mode, otherwise use the present mappings below.
self.statusButtonMap = [99]*23
if map == None:
DinBaseButton = 1
if 'CB6' in self.statusBoxMode:
# position in status line = logical button number
self.statusButtonMap[11] = 1
self.statusButtonMap[12] = 2
self.statusButtonMap[13] = 3
self.statusButtonMap[14] = 4
self.statusButtonMap[15] = 5
self.statusButtonMap[16] = 6
DinBaseButton = 7
# For a 3 button box wired to the DINs
if 'IO' in self.statusBoxMode:
self.statusButtonMap[1] = 0+DinBaseButton
self.statusButtonMap[2] = 1+DinBaseButton
self.statusButtonMap[3] = 2+DinBaseButton
# Add 3 more buttons wired to the DINs
if 'IO6' in self.statusBoxMode:
self.statusButtonMap[4] = 3+DinBaseButton
self.statusButtonMap[5] = 4+DinBaseButton
self.statusButtonMap[6] = 5+DinBaseButton
# Add all the Dins to the button list
if 'IO10' in self.statusBoxMode:
self.statusButtonMap[7] = 6+DinBaseButton
self.statusButtonMap[8] = 7+DinBaseButton
self.statusButtonMap[9] = 8+DinBaseButton
self.statusButtonMap[10] = 9+DinBaseButton
if (('Trigger' in self.statusBoxMode)
or ('trigger' in self.statusBoxMode)):
self.statusButtonMap[0] = 17
if (('Analog' in self.statusBoxMode)
or ('analog' in self.statusBoxMode)):
self.statusButtonMap[17] = 18
self.statusButtonMap[18] = 19
self.statusButtonMap[19] = 20
self.statusButtonMap[20] = 21
self.statusButtonMap[21] = 22
self.statusButtonMap[22] = 23
else:
self.statusBoxSetKeys(map) # Set up a bespoke mapping
self.statusBoxEnabled = True
logging.debug("statusBox emulation enabled")
self.flush()
#time = 60
# Enable status reading
self.statusThread=threading.Thread(target=self._statusBox)
self.statusBoxEnd = False
self._statusEnable()
self.statusThread.start()
[docs]
def statusBoxDisable(self):
""" Disables the detection of statusBox events.
This is useful to stop the Bits# from reporting key presses
When you no longer need them. And must be done before using any
other data logging methods.
It undoes any button - input mappings
"""
if self.noComms:
self.statusBoxEnabled = False
return
# disable code here
self.statusBoxEnd = True # This semaphore will tell the status logging thread to stop.
self.statusThread.join() # Join the thread and wait for it to finish.
del self.statusThread
self.flush()
self.statusBoxResetKeys()
self.statusBoxEnabled = False
logging.debug("statusBox emulation disabled")
def statusBoxResetKeys(self):
self.statusButtons = [99]*23
[docs]
def statusBoxSetKeys(self,map):
""" Set key mappings: first resets existing then adds new ones.
Does not reset any event that is not in the new list.
statusBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, triggers and CB6 IR input channels.
The format for map is a list of tuples with each tuple containing the name of the
RTBox button to be mapped and its source eg ('btn1','Din1') maps physical input Din1 to
logical button btn1.
statusBox has 17 logical buttons (btn1-17)
Buttons/events can be mapped to multiple physical inputs and stay mapped until reset.
Example:
bits.RTBoxSetKeys([('btn1','Din0),('btn2','IRButtonA')])
Will link physical Din0 to logical button 1 and IRButtonA to button 2.
To match with the CRS hardware description inputs are labelled as follows.
TrigIn, Din0 ... Din9, IRButtonA ... IRButtonF, AnalogIn1 ... AnalogIn6
Logical buttons are numbered from 1 to 23.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
self.statusBoxResetKeys()
self.statusBoxAddKeys(map)
[docs]
def statusBoxAddKeys(self,map):
""" Add key mappings to an existing map.
statusBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, triggers and CB6 IR input channels.
The format for map is a list of tuples with each tuple containing the name of the
RTBox button to be mapped and its source eg ('btn1','Din1') maps physical input Din1 to
logical button btn1.
statusBox has 23 logical buttons (btn1-23).
Unlike RTBox buttons/events can only be partially mapped to multiple physical inputs.
That is a logical button can be mapped to more than 1 physical input but a physical input
can onloy be mapped to 1 logical button.
So, this function over write any existing mappings if the physical input is the
same.
Example::
bits.RTBoxSetKeys([('btn1','Din0),('btn2','Din1')])
bits.RTBoxAddKeys([('btn1','IRButtonA'),(('btn2','IRButtonB')])
Will link Din0 to button 1 and Din1 to button 2.
Then adds IRButtonA and IRButtonB alongside the original mappings.
Now both hard wired and IR inputs will emulate the same logical button press.
To match with the CRS hardware description inputs are labelled as follows.
TrigIn, Din0 ... Din9, IRButtonA ... IRButtonF, AnalogIn1 ... AnalogIn6
Logical buttons are numbered from 1 to 23.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
""" # reset the mapping
for mapping in map:
btn = int(mapping[0][3])
if mapping[1][0] == 'I': # IR input
# A = 65 so ord(chr)-65+11 is the 11th status input etc
# status inputs will be adjusted later to match true position.
sourse = ord(mapping[1][-1])-65+11
if mapping[1][0] == 'D': # Digital input
source = int(mapping[1][-1])+1
if mapping[1][0] == 'T': # Trigger input
source = 0
if mapping[1][0] == 'A': # Analog input
source = int(mapping[1][-1])+16
self.statusButtonMap[sourse] = btn
[docs]
def getStatusBoxResponses(self, N=1):
""" checks for (at least) an appropriate number of
RTBox style key presses on the input buffer then reads them.
Returns a list of dict like objects with three members
'button', 'dir' and 'time'
'button' is a number from 1 to 9 to indicate the event that
was detected.
1-4 are the 'btn1-btn4' events, 5 and 6 are the
'light' and 'pulse' events,
7 is the 'trigger' event,
9 is a requested timestamp event (see Clock()).
'dir' is the direction of the event eg 'up' or 'down',
trigger is described as 'on' when low.
'dir' is set to 'time' if a requested
timestamp event has been detected.
'time' is the timestamp associated with the event.
Values can be read as a list of structures eg::
res = getRTBoxResponses(3)
print(res[0].dir, res[0].button, res[0].time)
or dictionaries::
print(res[0]['dir'], res[0]['button'], res[0]['time'])
Note even if only 1 key press was requested
a list of dict / objects is returned.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if self.noComms: # If we have no commes there will nothing to read
return
if self.statusQ.qsize() > N - 1:
res = [button() for i in range(N)]
# get the required number of button presses off the queue
for i in range(N):
res[i] = self.statusQ.get()
# empty the queue of everything else
while not self.statusQ.empty():
dummy = self.statusQ.get()
self.nStatusPresses = N
self.statusButtons = res
return res
else:
return
[docs]
def getStatusBoxResponse(self):
""" checks for one statusBox style key presses on the input
buffer then reads it.
Returns a dict like object with three members
'button', 'dir' and 'time'
'button' is a number from 1 to 9 to indicate the event that
was detected.
1-17 are the 'btn1-btn17' events.
'dir' is the direction of the event eg 'up' or 'down',
trigger is described as 'on' when low.
'dir' is set to 'time' if a requested
timestamp event has been detected.
'time' is the timestamp associated with the event.
Value can be read as a structure, eg:
res= getRTBoxResponse()
res.dir, res.button, res.time
or dictionary
res['dir'], res['button'], res['time']
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
self.getStatusBoxResponses(1)
if self.nStatusPresses>0:
op=self.statusButtons[0]
return op
else:
return
[docs]
def getAllStatusBoxResponses(self):
""" Read all of the statusBox style key presses on the
input buffer.
Returns a list of dict like objects with three members
'button', 'dir' and 'time'
'button' is a number from 1 to 9 to indicate the event that
was detected.
1-17 are the 'btn1-btn17' events.
'dir' is the direction of the event eg 'up' or 'down',
trigger is described as 'on' when low.
'dir' is set to 'time' if a requested
timestamp event has been detected.
'time' is the timestamp associated with the event.
Values can be read as a structure eg::
res= getAllStatusBoxResponses()
res[0].dir, res[0].button, res[0].time
or dictionary::
res[0]['dir'], res[0]['button'], res[0]['time']
Note even if only 1 key press was found
a list of dict / objects is returned.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
N=self.statusQ.qsize()
return self.getStatusBoxResponses(N)
[docs]
def statusBoxKeysPressed(self,N=1):
"""Check to see if (at least) the appropriate number
of RTBox style key presses have been made.
Example:
bits.statusBoxKeysPressed(5)
will return false until 5 button presses have been recorded.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
#7 is number of bytes for one press
if self.statusQ.qsize() > N - 1:
return True
else:
return False
[docs]
def statusBoxWaitN(self, N=1):
"""Waits until (at least) the appropriate number
of RTBox style key presses have been made
Pauses program execution in mean time.
Example:
res = bits.statusBoxWaitN(5)
will suspend all other activity until 5 button presses have been
recorded and will then return a list of Dicts containing the 5 results.
Results can be accessed as follows:
structure::
res[0].dir, res[0].button, res[0].time
or dictionary::
res[0]['dir'], res[0]['button'], res[0]['time']
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if self.noComms:
return
while self.statusQ.qsize() < N:
continue # ie loop
return self.getStatusBoxResponses(N)
[docs]
def statusBoxWait(self):
"""Waits until (at least) one
of RTBox style key presses have been made
Pauses program execution in mean time.
Example:
res = bits.statusBoxWait()
will suspend all other activity until 1 button press has been
recorded and will then return a dict / structure containing results.
Results can be accessed as follows:
structure
res.dir, res.button, res.time
or dictionary
res['dir'], res['button'], res['time']
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also DBits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if self.noComms:
return
while self.statusQ.empty():
continue # ie loop
return self.getStatusBoxResponse()
#====================================================================#
# Helper function to run in its own thread detecting statusBox #
# events and putting them in a queue #
#====================================================================#
def _statusBox(self):
""" Should not normally be called by user
Called in its own thread via self.statusBoxEnable()
Reads the status reports from the Bits# for default 60 seconds or
until self.statusBoxDisable() is called.
Note any non status reports are found on the buffer will
cause an error.
args specifies the time over which to record status events.
The minimum time is 10ms, less than this results in recording stopping after
about 1 status report has been read.
Puts its results into a Queue.
This function is normally run in its own thread so actions can be asynchronous.
"""
# Continue reading data until sample time is up or status.End is set
# Note when used in thread statusEnd can be set from outside this function.
firstIn = True # we do something different for the very first status entry
while (self.statusBoxEnd == False):
self.com.timeout = 0.01
raw=""
nChars = self._inWaiting()
if nChars >= self._statusSize: # we many have a status report
# use self.com.read() to get exact number of chars
raw = self.com.read(nChars)
msg=raw.decode("utf-8")
# just in case split message into status lines marked by CR
lines = msg.split('\r')
N = len(lines)
values = [button() for i in range(N)]
for i in range(N-1): # for each status line
# split line into component parts marked by ;
v=lines[i].split(';')
if v[0] == '#sample': # Check we have read a status line
if not firstIn: # Not first status entry
# Check digital inputs
for sourse in range(17):
# if input has changed and input is mapped
if (v[3+sourse] != log[3+sourse]
and self.statusButtonMap[sourse] < 24):
if ('Down' in self.statusBoxMode
or 'down' in self.statusBoxMode):
if int(v[3+sourse]) == 0:
values[i].button = (
self.statusButtonMap[sourse])
values[i].time = float(v[2])
values[i].dir = 'down'
self.statusQ.put(values[i])
if ('Up' in self.statusBoxMode
or 'up' in self.statusBoxMode):
if int(v[3+sourse]) == 1:
values[i].button = (
self.statusButtonMap[sourse])
values[i].time = float(v[2])
values[i].dir = 'up'
self.statusQ.put(values[i])
# save new state of sourse
log[3+sourse] = v[3+sourse]
# Check analog inputs
for analog in range(6):
sourse = analog + 17
fLog = float(log[3+sourse])
fVal = float(v[3+sourse])
# if input has changed enough and input is mapped
if (abs(fVal - fLog)
> self.statusBoxThreshold
and self.statusButtonMap[sourse] < 24):
if ('Down' in self.statusBoxMode
or 'down' in self.statusBoxMode):
# Is change of input in downward direction
if fLog > fVal:
values[i].button = (
self.statusButtonMap[sourse])
values[i].time = float(v[2])
values[i].dir = 'down'
self.statusQ.put(values[i])
if ('Up' in self.statusBoxMode
or 'up' in self.statusBoxMode):
# Is change of input in upward direction
if fVal > fLog:
values[i].button = (
self.statusButtonMap[sourse])
values[i].time = float(v[2])
values[i].dir = 'up'
self.statusQ.put(values[i])
# save new state of sourse
log[3+sourse] = v[3+sourse]
else: # this is the first entry
log = deepcopy(v) # make a copy of status entry
firstIn = False
elif v[0] == '$touch': # We've read a screen touch event by mistake.
warning = ("_statusBox found touch"
" data on input so skipping that")
logging.warning(warning)
else: # We've read something we can't interpret.
warning = ("_statusBox found unknown data"
" on input so skipping that")
logging.warning(warning)
# clearn up when stop is called.
# Send stop signal to CRS device to shut it up.
self._statusDisable() # Send stop signal to CRS device to shut it up.
self.statusBoxEnd = True # Confirm that data logging has ended.
self.flush()
#====================================================================#
# 'status' functions use BitsSharp status reporting to read the #
# digital IO, IR channels #
# and analog inputs via a separate thread #
#====================================================================#
[docs]
def setStatusEventParams(self, DINBase=0b1111111111,
IRBase=0b111111,
TrigInBase=0,
ADCBase=0,
threshold=9999.99,
mode=['up','down']):
""" Sets the parameters used to determine if a status value represents
a reportable event.
DIN_base = a 10 bit binary word specifying the expected starting
values of the 10 digital input lines
IR_base = a 6 bit binary word specifying the expected starting
values of the 6 CB6 IR buttons
Trig_base = the starting value of the Trigger input
mode = a list of event types to monitor can be 'up' or 'down'
typically 'down' corresponds to a button press or when the input
is being pulled down to zero volts.
Example::
bits.setStatusEventParams(DINBase=0b1111111111,
IRBase=0b111111,
TrigInBase=0,
ADCBase=0,
threshold = 3.4,
mode = ['down'])
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
res=getAllStatusEvents(0)
print(bits.res.time)
This ill start the event extraction process as if DINs and IRs are all '1', Trigger is '0'
ADCs = 0 with an ADC threshold for change of 3.4 volts,
and will only register 'down' events. Here we display the time stamp of the first event.
Note that the firmware in Display++ units varies over time and some
features of this class may not work for all firmware versions.
Also Display++ units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
self.statusDINBase = DINBase # Initial values for ditgial ins
self.statusIRBase = IRBase # Initial values for CB6 IR box
self.statusTrigInBase = TrigInBase # Initial values for TrigIn
self.statusADCBase = ADCBase # Initial value for all ADCs
self.statusThreshold = threshold # Threshold for ADC events
self.statusMode = mode # Direction of events to be reported
[docs]
def pollStatus(self, t=0.0001):
""" Reads the status reports from the Bits# for the specified
usually short time period t. The script will wait for this time
to lapse so not ideal for time critical applications.
If t is less than 0.01 polling will continue until at least 1
data entry has been recorded.
If you don't want to wait while this does its job
use startStatusLog and stopStatusLog instead.
Fills the statusValues list with all the status values
read during the time period.
Fills the statusEvents list with just those status values
that are likely to be meaningful events.
the members statusValues and statusEvents will end up containing
dict like objects of the following style:
sample, time, trigIn, DIN[10], DWORD, IR[6], ADC[6]
They can be accessed as statusValues[i]['sample'] or
stautsValues[i].sample, statusValues[x].ADC[j].
Example::
bits.pollStatus()
print(bits.statusValues[0].IR[0])
will display the value of the IR InputA in the first sample recorded.
Note: Starts and stops logging for itself.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
self.startStatusLog(t)
self.statusThread.join()
self._statusDisable()
self._getStatusLog()
self._extractStatusEvents()
self.flush()
del self.statusThread
[docs]
def startStatusLog(self, t=60):
""" Start logging data from the Bits#
Starts data logging in its own thread.
Will run for t seconds, defrault 60 or until
stopStatusLog() is called.
Example::
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if self.statusBoxEnabled:
warning = ("Cannot use status log when statusBox is on ")
raise AssertionError(warning)
if self.RTBoxEnabled:
warning = ("Cannot use status log when RTBox is on ")
raise AssertionError(warning)
# Try both Py2 and Py3 safe versions
# The two Python versions seem to want to pass args to threads in different ways.
try:
self.statusThread=threading.Thread(target=self._statusLog,args=(t,))
except Exception:
self.statusThread=threading.Thread(target=self._statusLog,args=(t))
self.statusEnd = False
self._statusEnable()
self.statusThread.start()
[docs]
def stopStatusLog(self):
""" Stop logging data from the Bits#
and extracts the raw status values and significant events and puts them
in statusValues and statusEvents.
statusValues will end up containing
dict like objects of the following style: `sample, time, trigIn, DIN[10], DWORD, IR[6], ADC[6]`
They can be accessed as statusValues[i]['sample'] or
statusValues[i].sample, statusValues[x].ADC[j].
StatusEvents will end up containing dict like objects of
the following style: `source, input, direction, time`
The data can be accessed as statusEvents[i]['time'] or statusEvents[i].time
Waits for _statusLog to finish properly
so can introduce a timing delay.
Example::
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
print(bits.statusValues[0].time)
print(bits.statusEvents[0].time)
Will display the time stamps of the first starus value recorded and the first
meaningful event.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
self.statusEnd=True # This semaphore will tell the status logging thread to stop.
self.statusThread.join() # Join the thread and wait for it to finish.
# Get the stausts values and events form the queue.
self._getStatusLog()
self._extractStatusEvents()
del self.statusThread
[docs]
def getAllStatusEvents(self):
"""Returns the whole status event list
Returns a list of dictionary like objects with the following entries
source, input, direction, time.
source = the general source of the event - e.g.
DIN for Digital input,
IR for CB6 IR response box events
input = the individual input in the source.
direction = 'up' or 'down'
time = time stamp.
All sourses are numbered from zero.
Din 0 ... 9
IR 0 ... 5
ADC 0 ... 5
mode specifies which directions of events are captured.
e.g 'up' will only report up events.
The data can be accessed as value[i]['time'] or value[i].time
Example::
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
res=getAllStatusEvents()
print(bits.res[0].time)
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
return self.statusEvents
[docs]
def getStatusEvent(self, N=0):
""" pulls out the Nth event from the status event list
Returns a dictionary like object with the following entries
source, input, direction, time.
source = the general source of the event - e.g.
DIN for Digital input,
IR for IT response box.
input = the individual input in the source.
direction = 'up' or 'down'
time = time stamp.
All sourses are numbered from zero.
Din 0 ... 9
IR 0 ... 5
ADC 0 ... 5
mode specifies which directions of events are captured,
e.g 'up' will only report up events.
The data can be accessed as value['time'] or value.time
Example::
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
res=getAllStatusEvents(20)
print(bits.res.time)
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if N < self.status_nEvents:
op = self.statusEvents[N]
return op
else:
return
[docs]
def getAllStatusValues(self):
"""Returns the whole status values list.
Returns a list of dict like objects with the following entries
sample, time, trigIn, DIN[10], DWORD, IR[6], ADC[6]
sample is the sample ID number.
time is the time stamp.
trigIn is the value of the trigger input.
DIN is a list of 10 digital input values.
DWORD represents the digital inputs as a single decimal value.
IR is a list of 10 infra-red (IR) input values.
ADC is a list of 6 analog input values.
These can be accessed as value[i]['sample']
or value[i].sample, values[i].ADC[j].
All sourses are numbered from zero.
Din 0 ... 9
IR 0 ... 5
ADC 0 ... 5
Example::
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
res=getAllStatusValues()
print(bits.res[0].time)
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
return self.statusValues
[docs]
def getStatus(self,N=0):
"""Pulls out the Nth entry in the statusValues list.
Returns a dict like object with the following entries
sample, time, trigIn, DIN[10], DWORD, IR[6], ADC[6]
sample is the sample ID number.
time is the time stamp.
trigIn is the value of the trigger input.
DIN is a list of 10 digital input values.
DWORD represents the digital inputs as a single decimal value.
IR is a list of 10 infra-red (IR) input values.
ADC is a list of 6 analog input values.
These can be accessed as value['sample']
or value.sample, values.ADC[j].
All sourses are numbered from zero.
Din 0 ... 9
IR 0 ... 5
ADC 0 ... 5
Example::
bits.startStatusLog()
while not event
#do some processing
continue
bits.stopStatusLog()
res=getStatus(20)
print(bits.res.time)
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if N < self.status_nValues:
value=self.statusValues[N]
return value
else:
return
[docs]
def getAnalog(self,N=0):
"""Pulls out the values of the analog inputs for
the Nth status entry.
Returns a dictionary with a list of 6 floats (ADC) and a time stamp (time).
All sourses are numbered from zero.
ADC 0 ... 5
Example:
bits.pollStatus()
res=bits.getAnalog()
print(res['ADC'])
will poll the status display the values of the ADC inputs
in the first status entry returned.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
value=self.getStatus(N)
if value:
return {'ADC':value.ADC,'time':value.time}
[docs]
def getDigital(self,N=0):
""" Pulls out the values of the digital inputs for
the Nth status entry.
Returns a dictionary with a list of 10 ints that
are 1 or 0 (DIN) and a time stamp (time)
ll sourses are numbered from zero.
Din 0 ... 9
Example:
bits.pollStatus()
res=bits.getAnalog()
print(res['DIN'])
will poll the status display the value of the digital inputs
in the first status entry returned.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also DBits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
value=self.getStatus(N)
if value:
return {'DIN':value.DIN,'time':value.time}
[docs]
def getDigitalWord(self,N=0):
""" Pulls out the values of the digital inputs for
the Nth status entry.
Returns a dictionary with a 10 bit word representing the binary
values of those inputs (DWORD) and a time stamp (time).
Example:
bits.pollStatus()
res=bits.getAnalog()
print(res['DWORD'])
will poll the status display the value of the digital inputs
as a decimal number.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
value=self.getStatus(N)
if value:
return {'DWORD':value.DWORD,'time':value.time}
[docs]
def getTrigIn(self,N=0):
"""Pulls out the values of the trigger input for the
Nth status entry.
Returns dictionary with a 0 or 1 (trigIn) and a time stamp (time)
Example:
bits.pollStatus()
res=bits.getAnalog()
print(res['trigIn'])
will poll the status display the value of the trigger input.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
value=self.getStatus(N)
if value:
return {'trigIn':value.trigIn,'time':value.time}
[docs]
def getIRBox(self,N=0):
"""Pulls out the values of the CB6 IR response box inputs for
the Nth status entry.
Returns a dictionary with a list of 6 ints that are
1 or 0 (IRBox) and a time stamp (time).
ll sourses are numbered from zero.
IR 0 ... 5
Example:
bits.pollStatus()
res=bits.getAnalog()
print(res.['IRBox'])
will poll the status display the values of the IR box buttons
in the first status entry returned.
Note that the firmware in Bits# units varies over time and some
features of this class may not work for all firmware versions.
Also Bits# units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
value=self.getStatus(N)
if value:
return {'IRBox':value.IR,'time':value.time}
#=============================================================#
# Helper functions for the main status functions. #
# Not normally needed by the user. #
#=============================================================#
def _statusEnable(self):
"""Sets the Bits# to continuously send back its status until stopped.
You get a lot a data by leaving this going.
Not normally needed by user
"""
if self.RTBoxEnabled:
warning = ("Cannot use status log when RTBox is on ")
raise AssertionError(warning)
# send start message to CRS device
self.sendMessage(b'$Start\r')
self.statusEnabled = True
def _statusDisable(self):
"""Stop Bits# from recording data - and clears the buffer
Not normally needed by user
"""
# send stop message to CRS device
self.sendMessage(b'$Stop\r')
self.statusEnabled = False
self.flush()
def _statusLog(self,args=60):
""" Should not normally be called by user
Called in its own thread via self.startStatusLog()
Reads the status reports from the Bits# for default 60 seconds or
until self.stopStatusLog() is called.
Ignores the last line as this is can be bogus.
Note any non status reports are found on the buffer will
cause an error.
args specifies the time over which to record status events.
The minimum time is 10ms, less than this results in recording stopping after
about 1 status report has been read.
Puts its results into a Queue.
This function is normally run in its own thread so actions can be asynchronous.
"""
t = args # Get the time to run for
if t < 0.01: # But if very short treat as a 1-shot read.
oneshot = True
t = 0.01
else:
oneshot = False
sT=clock() # start time
msg=""
# Continue reading data until sample time is up or status.End is set
# Note when used in thread statusEnd can be set from outside this function.
while (clock() - sT < t) and (self.statusEnd == False):
smsg=self.read(timeout=0.1)
# Compile message strings
if smsg:
msg=msg + smsg.decode("utf-8")
# Stop if we have 1 whole status string in one shot mode
if len(msg) > self._statusSize and oneshot:
self.statusEnd = True
# Send stop signal to CRS device to shut it up.
self._statusDisable() # Send stop signal to CRS device to shut it up.
self.statusEnd = True # Confirm that data logging has ended.
if msg: # If we actually have a message
lines = msg.split('\r') # Split message into status lines marked by CR
N = len(lines)
values = [status() for i in range(N-1)] # ignore last line as likely to be error
for i in range(N-1): # for each status line
v=lines[i].split(';') # split line into component parts marked by ;
if v[0] == '#sample': # Check we have read a status line
# Now strip out status values into a structure.
values[i].sample = int(float(v[1]))
values[i].time = float(v[2])
values[i].trigIn = int(float(v[3]))
dword = 0;
for j in range(10): #10 digital inputs
values[i].DIN[j] = int(float(v[4+j]))
dword=dword + (2**j) * values[i].DIN[j]
values[i].DWORD = dword
for j in range(6): # 6 IR buttons
values[i].IR[j] = int(float(v[14+j]))
for j in range(6): # 6 Analog inputs
values[i].ADC[j] = float(v[20+j])
# Put the structure onto the cue.
self.statusQ.put(values[i])
elif v[0] == '$touch': # We've read a screen touch event by mistake.
warning = ("_statusLog found touch"
" data on input so skipping that")
logging.warning(warning)
else: # We've read something we can't interpret.
warning = ("_statusLog found unknown data"
" on input so skipping that")
def _getStatusLog(self):
""" Read the log Queue
Should not be needed by user if start/stopStatusLog or pollStatus
are used.
fills statusValues with a list of dictionary like objects with the following
entries:
sample, time, trigIn, DIN[10], DWORD, IR[6], ADC[6]
They can be accessed as statusValues[i]['sample']
or statusValues[i].sample, statusValues[i].ADC[j]
Also sets status_nValues to the number of values recorded.
"""
if not(self.statusQ.empty()):
# emply statusValues of correct size
self.statusValues = [
status() for i in range(self.statusQ.qsize())]
# Take the status values off the queue
for index in range(0,self.statusQ.qsize()):
self.statusValues[index] = self.statusQ.get()
self.status_nValues = len(self.statusValues)
else:
self.status_nValues = 0
def _extractStatusEvents(self):
""" Interprets values from status log to pullout any events.
Should not be needed by user if start/stopStatusLog or
pollStatus are used
Fills statusEvents with a list of dictionary like objects with the following entries
source, input, direction, time.
source = the general source of the event - e.g. DIN for
Digital input, IR for IT response box
input = the individual input in the source.
direction = 'up' or 'down'
time = time stamp.
Events are recorded relative to the four event flags
statusDINBase, initial values for ditgial ins.
statusIRBase, initial values for CB6 IR box.
statusTrigInBase, initial values for TrigIn.
statusMode, direction(s) of events to be reported.
The data can be accessed as statusEvents[i]['time'] or statusEvents[i].time
Also set status._nEvents to the number of events recorded
"""
DIN_baseAll = [0]*10
IR_baseAll = [0]*6
ADC_baseAll = [self.statusADCBase]*6
# Interpret statusDINBase and statusIRBase statusADCBase into lists.
for i in range(10):
mask = 2**i
if mask & self.statusDINBase:
DIN_baseAll[i] = 1
for i in range(6):
mask = 2**i
if mask & self.statusIRBase:
IR_baseAll[i] = 1
Trig_base = self.statusTrigInBase
N = len(self.statusValues)
self.statusEvents = []
nEvents = 0
for i in range(N):
DIN = self.statusValues[i].DIN
IR = self.statusValues[i].IR
Trig = self.statusValues[i].trigIn
ADC = self.statusValues[i].ADC
# For 10 digital inputs
for j in range(10):
# Has input changed high to low and is that direction being recorded?
if (DIN[j] == 0) and (DIN_baseAll[j] == 1):
if ('down' in self.statusMode
or 'Down' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'DIN'
self.statusEvents[nEvents].input = j
self.statusEvents[nEvents].dir = 'down'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
DIN_baseAll[j] = 0 # Set to the current input state
# Has input changed low to high and is that direction being recorded?
if (DIN[j] == 1) and (DIN_baseAll[j] == 0):
if ('up' in self.statusMode
or 'Up' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'DIN'
self.statusEvents[nEvents].input = j
self.statusEvents[nEvents].dir = 'up'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
DIN_baseAll[j] = 1 # Set to the current input state
# For 6 IR buttons
for j in range(6):
# Has button changed high to low and is that direction being recorded?
if (IR[j] == 0) and (IR_baseAll[j] == 1):
if ('down' in self.statusMode
or 'Down' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'IR'
self.statusEvents[nEvents].input = j
self.statusEvents[nEvents].dir = 'down'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
IR_baseAll[j] = 0 # Set to the current input state
# Has button changed low to high and is that direction being recorded?
if (IR[j] == 1) and (IR_baseAll[j]==0):
if ('up' in self.statusMode
or 'Up' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'IR'
self.statusEvents[nEvents].input = j
self.statusEvents[nEvents].dir = 'up'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
IR_baseAll[j] = 1 # Set to the current input state
for j in range(6):
# Has analog input changed high to low and is that direction being recorded?
if ADC_baseAll[j] - ADC[j] > self.statusThreshold:
if ('down' in self.statusMode
or 'Down' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'ADC'
self.statusEvents[nEvents].input = j
self.statusEvents[nEvents].dir = 'down'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
ADC_baseAll[j] = ADC[j]
# Has analog input changed low to high and is that direction being recorded?
if ADC[j] - ADC_baseAll[j] > self.statusThreshold:
if ('up' in self.statusMode
or 'Up' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'ADC'
self.statusEvents[nEvents].input = j
self.statusEvents[nEvents].dir = 'up'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
ADC_baseAll[j] = ADC[j]
# Has trigger changed high to low and is that direction being recorded?
if Trig == 0 and Trig_base == 1:
if ('down' in self.statusMode
or 'Down' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source='Trigger'
self.statusEvents[nEvents].input=0
self.statusEvents[nEvents].dir='down'
self.statusEvents[nEvents].time=self.statusValues[i].time
nEvents=nEvents+1
Trig_base = 0 # Set to the current input state
# Has trigger changed low to high and is that direction being recorded?
if Trig == 1 and Trig_base == 0:
if ('up' in self.statusMode
or 'Up' in self.statusMode):
self.statusEvents.append(event())
self.statusEvents[nEvents].source = 'Trigger'
self.statusEvents[nEvents].input = 0
self.statusEvents[nEvents].dir = 'up'
self.statusEvents[nEvents].time = self.statusValues[i].time
nEvents = nEvents + 1
Trig_base = 1 # Set to the current input state
self.status_nEvents=nEvents
#=======================#
# Other functions #
#=======================#
# TO DO: The following are either not yet implemented (or not tested)
[docs]
def start(self):
"""[Not currently implemented] Used to begin event collection by
the device.
Not really needed as other members now do this.
"""
raise NotImplementedError
[docs]
def stop(self):
"""[Not currently implemented] Used to stop event collection by
the device.
Not really needed as other members now do this.
"""
raise NotImplementedError
[docs]
def checkConfig(self, level=1, demoMode=False, logFile=''):
"""Checks whether there is a configuration for this device and
whether it's correct
Parameters
----------
level: integer
- 0: do nothing
- 1: check that we have a config file and that the graphics
card and operating system match that specified in the
file. Then assume identity LUT is correct
- 2: switch the box to status mode and check that the
identity LUT is currently working
- 3: force a fresh search for the identity LUT
"""
if self.noComms:
demoMode = True
prevMode = self.mode
# if we haven't fetched a config yet then do so
if not self.config:
self.config = Config(self)
# check that this matches the prev config for our graphics card etc
ok = False # until we find otherwise
if level == 1:
ok = self.config.quickCheck()
if not ok:
# didn't match our graphics card or OS
level = 2
self._warnTesting()
else:
self.mode = prevMode
self.win.gammaRamp = self.config.identityLUT
msg = "Bits# config matches current system: %s on %s"
logging.info(msg % (self.config.gfxCard, self.config.os))
return 1
# it didn't so switch to doing the test
if level == 2:
errs = self.config.testLUT(demoMode=demoMode)
if demoMode:
return 1
if (errs**2).sum() != 0:
level = 3
logging.info("The current LUT didn't work as identity. "
"We'll try to find a working one.")
else:
self.config.identityLUT = self.win.backend.getGammaRamp().transpose()
self.config.save()
self.mode = prevMode
logging.info("We found a LUT and it worked as identity")
return 1
if level == 3:
ok = self.config.findIdentityLUT(demoMode=demoMode,
logFile=logFile)
self.mode = prevMode
return ok
def _warnTesting(self):
msg = ("We need to run some tests on your graphics card (hopefully "
"just once).\nThe BitsSharp will go into status mode while "
"this is done.\nIt can take a minute or two...")
print(msg)
sys.stdout.flush()
msgOnScreen = visual.TextStim(self.win, msg)
msgOnScreen.draw()
self.win.flip()
core.wait(1.0)
self.win.flip()
# properties that need a weak ref to avoid circular references
@property
def win(self):
"""The window that this box is attached to
"""
if self.__dict__.get('win') is None:
return None
else:
return self.__dict__.get('win')()
@win.setter
def win(self, win):
self.__dict__['win'] = weakref.ref(win)
[docs]
class DisplayPlusPlus(BitsSharp):
"""A Display++ is Bits# box inside and LCD monitor so this class
is just a wrapper that reminds people with a Display++ that
they are using it. However unlike the Bits# you may not
have any analog connections on a Display++ unless you
purchased that option.
Everything in the Bits# class should work but any
analog values sent or read will be spurious unless you
have the analog hardware installed.
Note that the firmware in Display++ units varies over time and some
features of this class may not work for all firmware versions.
Also Display++ units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particualar it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
RTBox commands that reset the key mapping have been found not to work
one some firmware.
"""
name = b'CRS Display++'
def __init__(self, win=None,
portName=None,
mode='',
checkConfigLevel=1,
gammaCorrect = 'hardware',
gamma = None,
noComms=False):
super(DisplayPlusPlus,self).__init__(win, portName, mode, checkConfigLevel,
gammaCorrect, gamma,
noComms)
def __del__(self):
super(DisplayPlusPlus,self).__del__()
[docs]
class DisplayPlusPlusTouch(DisplayPlusPlus):
"""A Display++ is Bits# box inside and LCD monitor but its
also possible to add a touch screen option to Display++
and this class will give access to that.
Otherwise this class is just a wrapper to the Bits# class.
However unlike the Bits# you may not have any analog
connections on a Display++ unless you purchased that option.
Everything in the Bits# class should work but any
analog values sent or read will be spurious unless you
have the analog hardware installed.
Note that the firmware in Display++ units varies over time and some
features of this class may not work for all firmware versions.
Also Display++ units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
RTBox commands that reset the key mapping have been found not to work
one some firmware.
"""
name = b'CRS Display++Touch'
def __init__(self, win=None,
portName=None,
mode='',
checkConfigLevel = 1,
gammaCorrect = 'hardware',
gamma = None,
noComms=False):
super(DisplayPlusPlusTouch,self).__init__(win, portName, mode,
checkConfigLevel,
gammaCorrect, gamma,
noComms)
# Members for controlling touch screen
self.touchEnabled = False # Used to keep track of touch screen enables
self.touchLogEnd = False # used to kill the touch logging thread.
self.lastTouch = 'released' # Used to detect a possible error
# in the touch screen hardware.
self.touchFirstTime = True # Used to detect a possible error
#in the touch screen hardware.
self.touchDistance = 10 # Used to screen out events that are
#too close together.
self.touchTime = 0.1 # Used to screen out events that are
#too close in time.
self.touchType = ['touched', 'released'] # Used to screen in events
#in forming the touch event list.
self.checkTime = 0.25 # Used to screen out an error
#in the touch screen hardware
self._touchSize = 31 # Number of bytes returned by touch
# screen when pressed.
# Data members for touch screen
self.touch_nValues=0
self.touch_nEvents=0
self.touchValues=[]
self.touchEvents=[]
# Set up a queue in which to store touch screen events.
self.touchQ = Queue.Queue(70000)
def __del__(self):
super(DisplayPlusPlusTouch,self).__del__()
[docs]
def RTBoxEnable(self, mode=('CB6','Down','Trigger'), map=None):
""" Overaload RTBoxEnable for Display++ with touch screen.
Sets up the RTBox with preset or bespoke mappings
and enables event detection.
RTBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, tigers and
CB6 IR input channels.
Mode is a list of strings.
Preset mappings provided via mode:
CB6 for the CRS CB6 IR response box.
IO for a three button box connected to Din0-2
IO6 for a six button box connected to Din0-5
If mode = None or is not set then the value of self.RTBoxMode is used.
Bespoke Mappings over write preset ones.
The format for map is a list of tuples with each tuple
containing the name of the
RT Box button to be mapped and its source
eg ('btn1','Din0') maps physical input Din0 to
logical button btn1.
Note the lowest number button event is Btn1
RTBox has four logical buttons (btn1-4) and
three auxiliary events (light, pulse and trigger)
Buttons/events can be mapped to multiple physical
inputs and stay mapped until reset.
Mode is a list of string or list of strings that contains
keywords to determine present mappings and modes for RTBox.
If mode includes 'Down' button events will be
detected when pressed.
If mode includes 'Up' button events will be
detected when released.
You can detect both types of event but note
that pulse, light and trigger events
dont have an 'Up' mode.
If Trigger is included in mode the trigger
event will be mapped to the trigIn connector.
Example:
bits.RTBoxEnable(mode = 'Down'), map = [('btn1','Din0'), ('btn2','Din1')]
enable the RTBox emulation to detect Down events on buttons 1 and 2 where they are
mapped to DIN0 and DIN1.
Example:
bits.RTBoxEnable(mode = ['Down','CB6'])
enable the RTBox emulation to detect Down events on the standard CB6 IR response box keys.
"""
if self.touchEnabled:
warning = ("Cannot use RTBox when touch screen is on")
raise AssertionError(warning)
else:
if type(mode) is tuple:
mode = list(mode)
super(DisplayPlusPlusTouch, self).RTBoxEnable(mode = mode, map = map)
[docs]
def statusBoxEnable(self, mode=None, map=None, threshold = None):
""" Sets up the statusBox with preset or bespoke mappings
and enables event detection.
stautsBox events can be mapped to a number of physical events on Bits#
They can be mapped to digital input lines, tigers and
CB6 IR input channels.
Mode is a list of strings.
Preset mappings provided via mode:
CB6 for the CRS CB6 IR response box connected mapped to btn1-6
IO for a three button box connected to Din0-2 mapped to btn1-3
IO6 for a six button box connected to Din0-5 mapped to btn1-6
IO10 for a ten button box connected to Din0-9 mapped to btn1-10
Trigger maps the trigIn to btn17
if CB6 and IOx are used the Dins are mapped from btn7 onwards.
If mode = None or is not set then the value of self.statusBoxMode is used.
Bespoke Mappings over write preset ones.
The format for map is a list of tuples with each tuple
containing the name of the
button to be mapped and its source
eg ('btn1','Din0') maps physical input Din0 to
logical button btn1.
Note the lowest number button event is Btn1
statusBox has 17 logical buttons (btn1-17).
Buttons/events can be mapped to multiple physical
inputs and stay mapped until reset.
Mode is a string or list of strings that contains
keywords to determine present mappings and modes for statusBox.
If mode includes 'Down' button events will be
detected when pressed.
If mode includes 'Up' button events will be
detected when released.
You can detect both types of event noting that the event detector
will look for transitions and ignorewhat it sees as the starting state.
Example:
bits.statusBoxEnable(mode = 'Down'), map = [('btn1','Din0'), ('btn2','Din1')]
enable the stautsBox to detect Down events on buttons 1 and 2 where they are
mapped to DIN0 and DIN1.
Example:
bits.statusBoxEnable(mode = ['Down','CB6'])
enable the statusBox emulation to detect Down events on the standard CB6 IR response box keys.
Note that the firmware in Display++ units varies over time and some
features of this class may not work for all firmware versions.
Also Display++ units can be configured in various ways via their
config.xml file so this class makes certain assumptions about the
configuration. In particular it is assumed that all digital inputs,
triggers and analog inputs are reported as part of status
updates. If some of these report are disabled in your config.xml file
then 'status' and 'event' commands in this class may not work.
"""
if self.touchEnabled:
warning = ("Cannot use status Box when touch screen is on")
raise AssertionError(warning)
else:
super(DisplayPlusPlusTouch, self).statusBoxEnable(mode = mode, map = map, threshold = threshold)
def _statusEnable(self):
""" Overaload _statusEnable for Display++ with touch screen
Sets the Bits# to continuously send back its status until stopped.
You get a lot a data by leaving this going.
Not normally needed by user
"""
if self.touchEnabled:
warning = ("Cannot use status log when touch screen is on")
raise AssertionError(warning)
else:
super(DisplayPlusPlusTouch, self)._statusEnable()
#===================================================================#
# The getTouch, touchWait, and touchPressed commands work #
# a bit like equivalent RTBox commands. #
# #
# They do use touch logging in a thread but only do anything if #
# there is something useful on the serial buffer so you need #
# to call touchEnable before and touchDisable after any call #
# to these functions #
#===================================================================#
[docs]
def touchEnable(self):
""" Turns on the touch screen. Any presses will now be reported
Example:
bits.touchEnable()
res = bits.touchWait()
bits.touchDisable()
print(res.time)
Enables touch screen, waits for a touch, disables touch screen
and displays the timestamp of the touch.
"""
if self.RTBoxEnabled:
warning = ("Cannot use touch screen when RTBox is on")
raise AssertionError(warning)
if self.statusBoxEnabled:
warning = ("Cannot use touch screen when status Box is on")
raise AssertionError(warning)
if self.statusEnabled:
warning = ("Cannot use touch screen when status logging is on")
raise AssertionError(warning)
if self.noComms:
return
# Send message to Display++ to turn on touch screen
self.sendMessage(b'$EnableTouchScreen=[ON]\r')
msg=self.read(timeout=0.1)
msg=msg.decode("utf-8")
# Check the reply message
if 'ON' in msg:
self.touchEnabled = True
else:
raise AssertionError("Cannot enable touch screen")
self.flush()
[docs]
def touchDisable(self):
""" Turns off the touch screen.
Example:
bits.touchEnable()
res = bits.touchWait()
bits.touchDisable()
print(res.time)
Enables touch screen, waits for a touch, disables touch screen
and displays the timestamp of the touch.
"""
if self.noComms:
return
# Send message to Display++ to turn off the touch screen.
self.sendMessage(b'$EnableTouchScreen=[OFF]\r')
msg=self.read(timeout=0.1)
msg=msg.decode("utf-8")
# Check the reply message
if 'OFF' in msg:
self.touchEnabled = False
else:
raise AssertionError("Cannot disable touch screen")
self.flush()
[docs]
def getTouchResponses(self,N=1):
""" checks for (at least) an appropriate number of touch screen
presses on the input buffer then reads them.
Returns a list of dict like objects with four members
'x','y','dir' and 'time'
'x and y' are the x and y coordinates pressed.
'dir' is the direction of the event
eg 'touched' for presses and 'released' for releases.
'time' is the timestamp associated with the event.
these values can be read as a structure:
res=getTouchResponses(3)
res[0].dir, res[0].x, res[0].time
or dictionary
res[0]['dir'], res[0]['x'], res[0]['time']
Note even if only 1 response is requested the result is
a 1 item long list of dict like objects.
Note in theory this could be used to get multiple responses
but in practice the touch screen reports every slight
movements so the Logging methods, getTouchResponse
or getAllTouchResponses are better.
Note this function does not start touch screen
recording so should only be called
when there appears to be data waiting.
So you need to call touchEnable() before and
touchDisable after this function.
Example:
bits.touchEnable()
while not event:
# do some processing
continue
bits.touchDisable()
res=getTouchResponses(3)
print(res[0].time)
Will put the touch screen into continuous reading while doing some task
and at the end get the first 3 tocuhes are display the timestamp of the first one.
"""
if self.noComms or N == 0:
return
if self._inWaiting() > (N*self._touchSize - 1):
# Works by calling the startTouchLog() after the event.
# Will wait 0.01 seconds per response requested
self.startTouchLog(N*0.01)
# Join the touch log thread and wait for it to finish.
self.touchThread.join()
del self.touchThread
values = self.getTouchLog()
self.flush()
return values[0:N]
else:
return
[docs]
def getAllTouchResponses(self):
""" get all the touch screen presses from the
events on the input buffer then reads them.
Returns a list of dict like objects with four members
'x','y','dir' and 'time'
'x and y' are the x and y coordinates pressed.
'dir' is the direction of the event
eg 'touched' for presses and 'relased' for releases.
'time' is the timestamp associated with the event.
these values can be read as a structure:
res=getAllTouchResponses()
res[0].dir, res[0].x, rest[0].time
or dirctionary
res[0]['dir'], res[0]['x'], res[0]['time']
Note even if only 1 response is requested the result is
a 1 item long list of dict like objects.
Note in theory this could be used to get multiple responses
but in practice the touch screen reports every slight
movement so the Logging methods are better.
Note this function does not start touch screen
recording so should only be called
when there appears to be data waiting.
So you need to call touchEnable() before and
touchDisable after this function
Example:
bits.touchEnable()
while not event:
# do some processing
continue
bits.touchDisable()
res=getAllTouchResponses(3)
print(res[0].time)
Will put the touch screen into continuous reading while doing some task
and at the end get all the tocuhes are display the timestamp of the first one.
"""
N = int(np.floor(self._inWaiting() / self._touchSize))
return self.getTouchResponses(N)
[docs]
def getTouchResponse(self):
""" checks for (at least) one touch screen
press on the input buffer then reads it.
Returns a dict like object with four members
'x','y','dir' and 'time'
'x and y' are the x and y coordinates pressed.
'dir' is the direction of the event
eg 'touched' for presses and 'relased' for releases.
'time' is the timestamp associated with the event.
these values can be read as a structure:
res=getTouchGetResponse()
res.dir, res.x, rest.time
or dirctionary
res['dir'], res['x'], res['time']
Note this function does not start touch screen
recording so should only be called
when there appears to be data waiting.
So you need to call touchEnable() before and
touchDisable after this function.
Example:
bits.touchEnable()
while not event:
# do some processing
continue
bits.touchDisable()
res=getTouchResponse()
print(res.time)
Will put the touch screen into continuous reading while doing some task
and at the end get all the tocuhes are display the timestamp of the first one.
"""
res=self.getTouchResponses(1)
value=res[0]
return value
[docs]
def touchPressed(self,N=1):
"""Check to see if (at least) the appropriate number of
touch screen events have been made
Not accurate due to touch jitter creating extra events
but can be used to detect first press.
Example:
bits.touchEnable()
while not bits.touchPressed(5):
# do some processing
continue
bits.touchDisable()
Will do some processing until 5 screen couches are recorded.
"""
if self.noComms:
return
if self._inWaiting()<self._touchSize*N:
return False
else:
return True
# touchWaitN is depreciated due to touch jitter creating false events
[docs]
def touchWaitN(self, N=1):
"""Waits until (at least) the appropriate number of
touch screen events have been made
then reports the responses.
Pauses program execution in mean time.
Not every accurate due to touch jitter creating extra events.
Example:
bits.touchEnable()
bits.touchWaitN(5):
bits.touchDisable()
Will pause until 5 screen touches are recorded.
"""
if self.noComms:
return
while self._inWaiting()<self._touchSize*N:
continue # ie loop
return self.getTouchResponses(N)
[docs]
def touchWait(self):
"""Waits until (at least) one
touch screen event have been made.
Then reports the response.
Pauses programe execution in mean time.
Example:
bits.touchEnable()
bits.touchWait():
bits.touchDisable()
Will pause until 1 screen touch recorded.
"""
if self.noComms:
return
while self._inWaiting()<self._touchSize:
continue # ie loop
return self.getTouchResponse()
#================================================================#
# Touch logging commands work a bit more like the status log #
# commands.
# You turn logging on, do other stuff, then later collect the #
# log results with getTouchLog #
#================================================================#
[docs]
def startTouchLog(self,t=60):
""" Start logging data from the touch screen.
Turns on the touch screen.
Example:
bits.startTouchLog()
while not event:
#do some processing
continue
bits.stopTouchLog()
res=bits.getTouchLog()
"""
if not(self.touchEnabled):
self.touchEnable()
#Try both Python 2 and 3 versions for initialising a thread
try:
self.touchThread=threading.Thread(target=self._touchLog,args=(t,))
except Exception:
self.touchThread=threading.Thread(target=self._touchLog,args=(t))
self.touchLogEnd=False
self.touchThread.start()
[docs]
def stopTouchLog(self):
""" Stop logging data from the Bits#
Waits for _getLog to finish properly so can
introduce a timing delay.
Does not automatically extract the touch log.
Example:
bits.startTouchLog()
while not event:
#do some processing
continue
bits.stopTouchLog()
res=bits.getTouchLog()
"""
self.touchLogEnd=True
self.touchThread.join()
del self.touchThread
self.flush()
if self.touchEnabled:
self.touchDisable()
[docs]
def getTouchLog(self, caution=True, checkTime=None):
""" Reads out the touch screen cue and checks for a known error
Returns a list of dict like objects with four members
'x','y','dir' and 'time'
'x and y' are the x and y coordinates pressed.
'dir' is the direction of the event
eg 'touched' for presses and 'released' for releases.
'time' is the timestamp associated with the event.
these values can be read as a structure:
res=getTouchResponses(3)
res[0].dir, res[0].x, res[0].time
or dictionary
res[0]['dir'], res[0]['x'], res[0]['time']
Note even if only 1 response is requested the result is
a 1 item long list of dict like objects.
Note in theory this could be used to get multiple responses
but in practice the touch screen reports every slight
movements so the Logging methods, getTouchResponse
or getAllTouchResponses are better.
Note this function does not start touch screen
recording so should only be called
when there appears to be data waiting.
So you need to call touchEnable() before and
touchDisable after this function.
checkTime specifies a time between the first and second touches
after which a warning of a possible error will be issued and
the error may be corrected.
caution: If True tell the function to check for a known Display++ error
if you don't have that error or you don't care about it you should set
this to False.
Example:
bits.startTouchLog()
while not event:
#do some processing
continue
bits.stopTouchLog()
res=bits.getTouchLog(caution = False)
Will get all the screen touches without checking for errors.
"""
if checkTime!=None:
self.checkTime = checkTime
number = self.touchQ.qsize()
if not(self.touchQ.empty()):
self.touchValues=[touch() for i in range(self.touchQ.qsize())]
for index in range(0,self.touchQ.qsize()):
self.touchValues[index]=self.touchQ.get()
#Display++ can sometimes issue a spurious touch event left over
#from a previous series of touches. The following code should detect
#and correct the error
lastTouch = self.touchValues[number-1].dir
#No need to worry if only one touch recorded or the user is happy
#to forgo checks.
if number>1 and caution:
#Detects if first timestamp is after second
#Works if Display++ clock has been reset between touch
#data collection sessions.
if self.touchValues[0].time > self.touchValues[1].time:
del self.touchValues[0]
warning=("getTouchLog: Deleted first touch as recorded "
"after second. This corrects an error in "
"the Display++")
logging.warning(warning)
#Detects that the last recorded touch in a previous call to
#getTouchLog was a 'touched' rather than a 'release' event.
#this is likely to cause the error but won't capture and
#error from the last run of an experiment that uses the
#touch screen
elif self.lastTouch == 'touched':
del self.touchValues[0]
warning=("getTouchLog: Deleted first touch as the "
"last previously recorded touch event "
"was not a release and this can indicate an "
"error in the Display++. However, this could be the "
"wrong thing to have done")
logging.warning(warning)
#Detects that gap between first two timestamps is not larger
#than checkTime.
#Due to finger jitter this is unlikely if first touch is real.
#But this is not fool proof it could delete a first very
#clean touch
#and if the other two tests failed and this is not
#the first call to this function the chances are it a good
#touch - hence just a warning in that case.
elif (self.touchValues[1].time
- self.touchValues[0].time) > self.checkTime:
warning=("getTouchLog: Gap between first and second "
"touches is large normally finger jitter means "
"it is quite short.")
logging.warning(warning)
if self.touchFirstTime == False:
del self.touchValues[0]
warning=("getTouchLog: Deleted first touch as "
"the gap between first "
"and second touches is large "
"and this is not the first call of the run. "
"But this could be the wrong thing to have "
"done.")
logging.warning(warning)
self.lastTouch = lastTouch
self.touchFirstTime = False
self.touch_nValues=len(self.touchValues)
return self.touchValues
else:
self.touch_nValues=0
return None
#===============================================#
# Helper function for touch screen commands. #
# Normally run in its own thread. #
#===============================================#
def _touchLog(self, args=(60,)):
""" Gets raw touch screen events and put them in a Queue
Not normally needed by the user.
"""
t = args # Get the time to run for
sT = clock() # start time
msg = ""
# While not timed out and until touchLogEnd is true
# When run in a thread touchLogEnd can be set from outside this function.
while (clock() - sT < t) and (self.touchLogEnd == False):
# Complie the message
smsg = self.read(timeout = 0.1)
if smsg:
msg = msg+smsg.decode("utf-8")
self.touchDisable() # Turn off touch screen.
self.touchLogEnd=True # Make sure this function marked as ending.
self.flush()
if msg:
lines = msg.split('\r') # Split message into lines with CR
N = len(lines)
values=[touch() for i in range(N)]
#for all the touches
for i in range(N):
v = lines[i].split(';') # Split line into components at ;
# If we have read a touch event.
if '$touch' in v[0]:
#Grab the details of the event into a structure.
values[i].time = float(v[1])
values[i].x = int(float(v[2]))
values[i].y = int(float(v[3]))
if int(float(v[4])) == 1:
values[i].dir = 'touched'
else:
values[i].dir = 'released'
self.touchQ.put(values[i])
elif '#status' in v[0]: # GOt a status event by mistake
warning=("_touchLog found"
" status on input so skipping that")
logging.warning(warning)
else: # Have picked up some other stuff on the input
warning=("_touchLog found"
" unknown data on input so skipping that")
logging.warning(warning)
#============================================================#
# Touch event functions can be used to get a list of more #
# meaningful events following any getTouch commands #
# works a bit like an eye movement gaze detector #
#============================================================#
[docs]
def setTouchEventParams(self, distance=None, t=None ,type=None):
""" Sets the parameters for touch event detection.
Distance is how far the touch should move to count
as a new touch.
Time is how long should lapse between touches for the
second one to count.
Example:
bits.startTouchLog()
while not event:
#do some processing
continue
bits.stopTouchLog()
res=bits.getTouchLog()
bits.setTouchEventParams(distance=20, t=0.001, type='touched')
events=bits.getTouchEvents()
Will extract touch events (not releases) that are 20 pixels and 1 ms apart.
"""
if distance!=None:
self.touchDistance = distance
if t != None:
self.touchTime = t
if type != None:
self.touchType = type
[docs]
def getTouchEvents(self, distance=None, t=None, type=None):
""" Scans the touch log to extract touch events.
You need to run getTouchLog before you run this function.
Returns as list of Dict like structures with members
time, x, y, and dir
time is the time stamp of the event.
x and y are the x and y locations of the event.
direction is the type of event: 'touched', 'released'
These values can be read as a structure:
res=getTouchResponses(3)
res[0].dir, res[0].x, res[0].time
or dictionary
res[0]['dir'], res[0]['x'], res[0]['time']
Example:
bits.startTouchLog()
while not event:
#do some processing
continue
bits.stopTouchLog()
bits.getTouchLog()
res=bits.getTouchEvents(distance=20, t=0.001, type='touched')
print(res[0]['time']
Will extract touch events (not releases) that are 20 pixels and 1 ms apart.
"""
self.setTouchEventParams(distance,t,type)
N=len(self.touchValues)
self.touchEvents = []
nEvents = 0
rT = -999999
rX = -999999
rY = -999999
rType = 'None'
nEvents = 0
for i in range(N):
dist=(((self.touchValues[i].x - rX)**2.0)
+((self.touchValues[i].y - rY)**2.0))**0.5
T = self.touchValues[i].time - rT
# Only include events that are sufficiently far from
# last recorded event in time and distance, or if
# the direction of touch has changed and the new
# direction is in the looked for type descriptor.
if ((dist > self.touchDistance
and T > self.touchTime)
or (rType != self.touchValues[i].dir
and self.touchValues[i].dir in self.touchType)):
self.touchEvents.append(touch())
self.touchEvents[nEvents].time = self.touchValues[i].time
self.touchEvents[nEvents].x = self.touchValues[i].x
self.touchEvents[nEvents].y = self.touchValues[i].y
self.touchEvents[nEvents].dir = self.touchValues[i].dir
rT = self.touchValues[i].time
rX = self.touchValues[i].x
rY = self.touchValues[i].y
rType = self.touchValues[i].dir
nEvents = nEvents + 1
self.touch_nEvents = nEvents
return self.touchEvents
[docs]
def getTouchEvent(self, N=0, distance=None, t=None, type=None):
""" Scans the touch log to return the Nth touch event.
You need to run getTouchLog before you run this function.
Returns as list of Dict like structures with members
time, x, y, and dir
time is the time stamp of the event.
x and y are the x and y locations of the event.
direction is the type of event: 'touched', 'released'
These values can be read as a structure:
res=getTouchResponses(3)
res[0].dir, res[0].x, res[0].time
or dictionary
res[0]['dir'], res[0]['x'], res[0]['time']
Example:
bits.startTouchLog()
while not event:
#do some processing
continue
bits.stopTouchLog()
bits.getTouchLog()
res=bits.getTouchEvent(N=10, distance=20, time=0.001, type='touched')
print(res.time)
Will extract the 10th touch events (ingnoreing releases) that are 20 pixels and 1 ms apart.
"""
values = self.getTouchEvents(distance, t, type)
value = values[N]
return value
class Config:
def __init__(self, bits):
# we need to set bits reference using weakref to avoid circular refs
self.bits = bits
self.load() # try to fetch previous config file
self.logFile = 0 # replace with a file handle if opened
def load(self, filename=None):
"""If name is None then we'll try to save to
"""
def parseLUTLine(line):
return line.replace('[', '').replace(']', '').split(',')
if filename is None:
from psychopy import prefs
filename = os.path.join(prefs.paths['userPrefsDir'],
'crs_bits.cfg')
if os.path.exists(filename):
config = configparser.RawConfigParser()
with open(filename) as f:
config.readfp(f)
self.os = config.get('system', 'os')
self.gfxCard = config.get('system', 'gfxCard')
self.identityLUT = np.ones([256, 3])
_idLUT = 'identityLUT'
self.identityLUT[:, 0] = parseLUTLine(config.get(_idLUT, 'r'))
self.identityLUT[:, 1] = parseLUTLine(config.get(_idLUT, 'g'))
self.identityLUT[:, 2] = parseLUTLine(config.get(_idLUT, 'b'))
return True
else:
logging.warn('no config file yet for %s' % self.bits)
self.identityLUT = None
self.gfxCard = None
self.os = None
return False
def _getGfxCardString(self):
from pyglet.gl import gl_info
return "%s: %s" % (gl_info.get_renderer(),
gl_info.get_version())
def _getOSstring(self):
import platform
return platform.platform()
def save(self, filename=None):
if filename is None:
from psychopy import prefs
filename = os.path.join(prefs.paths['userPrefsDir'],
'crs_bits.cfg')
logging.info('saved Bits# config file to %r' % filename)
# create the config object
config = configparser.RawConfigParser()
config.add_section('system')
self.os = config.set('system', 'os', self._getOSstring())
self.gfxCard = config.set('system', 'gfxCard',
self._getGfxCardString())
# save the current LUT
config.add_section('identityLUT')
config.set('identityLUT', 'r', list(self.identityLUT[:, 0]))
config.set('identityLUT', 'g', list(self.identityLUT[:, 1]))
config.set('identityLUT', 'b', list(self.identityLUT[:, 2]))
# save it to disk
with open(filename, 'w') as fileObj:
config.write(fileObj)
logging.info("Saved %s configuration to %s" % (self.bits, filename))
def quickCheck(self):
"""Check whether the current graphics card and OS match those of
the last saved LUT
"""
if self._getGfxCardString() != self.gfxCard:
logging.warn("The graphics card or its driver has changed. "
"We'll re-check the identity LUT for the card")
return 0
if self._getOSstring() != self.os:
logging.warn("The OS has been changed/updated. We'll re-check"
" the identity LUT for the card")
return 0
return 1 # all seems the same as before
def testLUT(self, LUT=None, demoMode=False):
"""Apply a LUT to the graphics card gamma table and test whether
we get back 0:255 in all channels.
:params:
LUT: The lookup table to be tested (256x3).
If None then the LUT will not be altered
:returns:
a 256 x 3 array of error values (integers in range 0:255)
"""
bits = self.bits # if you aren't yet in
win = self.bits.win
if LUT is not None:
win.gammaRamp = LUT
# create the patch of stimulus to test
expectedVals = list(range(256))
w, h = win.size
# NB psychopy uses -1:1
testArrLums = np.resize(np.linspace(-1, 1, 256), [256, 256])
stim = visual.ImageStim(win, image=testArrLums, size=[256, h],
pos=[128 - w//2, 0], units='pix')
expected = np.repeat(expectedVals, 3).reshape([-1, 3])
stim.draw()
# make sure the frame buffer was correct (before gamma was applied)
frm = np.array(win.getMovieFrame(buffer='back'))
assert np.alltrue(frm[0, 0:256, 0] == list(range(256)))
win.flip()
# use bits sharp to test
if demoMode:
return [0] * 256
pixels = bits.getVideoLine(lineN=50, nPixels=256)
errs = pixels - expected
if self.logFile:
for ii, channel in enumerate('RGB'):
self.logFile.write(channel)
for pixVal in pixels[:, ii]:
self.logFile.write(', %i' % pixVal)
self.logFile.write('\n')
return errs
def findIdentityLUT(self, maxIterations=1000, errCorrFactor=1.0/5000,
nVerifications=50,
demoMode=True,
logFile=''):
"""Search for the identity LUT for this card/operating system.
This requires that the window being tested is fullscreen on the Bits#
monitor (or at least occupies the first 256 pixels in the top left
corner!)
:params:
LUT: The lookup table to be tested (256 x 3).
If None then the LUT will not be altered
errCorrFactor: amount of correction done for each iteration
number of repeats (successful) to check dithering
has been eradicated
demoMode: generate the screen but don't go into status mode
:returns:
a 256x3 array of error values (integers in range 0:255)
"""
t0 = time.time()
# create standard options
intel = np.linspace(.05, .95, 256)
one = np.linspace(0, 1.0, 256)
fraction = np.linspace(0.0, 65535.0/65536.0, num=256)
LUTs = {'intel': np.repeat(intel, 3).reshape([-1, 3]),
'0-255': np.repeat(one, 3).reshape([-1, 3]),
'0-65535': np.repeat(fraction, 3).reshape([-1, 3]),
'1-65536': np.repeat(fraction, 3).reshape([-1, 3])}
if logFile:
self.logFile = open(logFile, 'w')
if plotResults:
pyplot.Figure()
pyplot.subplot(1, 2, 1)
pyplot.plot([0, 255], [0, 255], '-k')
errPlot = pyplot.plot(list(range(256)), list(range(256)), '.r')[0]
pyplot.subplot(1, 2, 2)
pyplot.plot(200, 0.01, '.w')
pyplot.show(block=False)
lowestErr = 1000000000
bestLUTname = None
logging.flush()
for LUTname, currentLUT in list(LUTs.items()):
sys.stdout.write('Checking %r LUT:' % LUTname)
errs = self.testLUT(currentLUT, demoMode)
if plotResults:
errPlot.set_ydata(list(range(256)) + errs[:, 0])
pyplot.draw()
print('mean err = %.3f per LUT entry' % abs(errs).mean())
if abs(errs).mean() < abs(lowestErr):
lowestErr = abs(errs).mean()
bestLUTname = LUTname
if lowestErr == 0:
msg = "The %r identity LUT produced zero error. We'll use that!"
print(msg % LUTname)
self.identityLUT = LUTs[bestLUTname]
# it worked so save this configuration for future
self.save()
return
msg = "Best was %r LUT (mean err = %.3f). Optimising that..."
print(msg % (bestLUTname, lowestErr))
currentLUT = LUTs[bestLUTname]
errProgression = []
corrInARow = 0
for n in range(maxIterations):
errs = self.testLUT(currentLUT)
tweaks = errs * errCorrFactor
currentLUT -= tweaks
currentLUT[currentLUT > 1] = 1.0
currentLUT[currentLUT < 0] = 0.0
meanErr = abs(errs).mean()
errProgression.append(meanErr)
if plotResults:
errPlot.set_ydata(list(range(256)) + errs[:, 0])
pyplot.subplot(1, 2, 2)
if meanErr == 0:
point = '.k'
else:
point = '.r'
pyplot.plot(n, meanErr, '.k')
pyplot.draw()
if meanErr > 0:
sys.stdout.write("%.3f " % meanErr)
corrInARow = 0
else:
sys.stdout.write(". ")
corrInARow += 1
if corrInARow >= nVerifications:
print('success in a total of %.1fs' % (time.time() - t0))
self.identityLUT = currentLUT
# it worked so save this configuration for future
self.save()
break
elif (len(errProgression) > 10 and
max(errProgression) - min(errProgression) < 0.001):
print("Trying to correct the gamma table was having no "
"effect. Make sure the window was fullscreen and "
"on the Bits# screen")
break
# did we get here by failure?!
if n == maxIterations - 1:
print("failed to converge on a successful identity LUT. "
"This is BAD!")
if plotResults:
pyplot.figure(figsize=[18, 12])
pyplot.subplot(1, 3, 1)
pyplot.plot(errProgression)
pyplot.title('Progression of errors')
pyplot.ylabel("Mean error per LUT entry (0-1)")
pyplot.xlabel("Test iteration")
r256 = np.reshape(list(range(256)), [256, 1])
pyplot.subplot(1, 3, 2)
pyplot.plot(r256, r256, 'k-')
pyplot.plot(r256, currentLUT[:, 0] * 255, 'r.', markersize=2.0)
pyplot.plot(r256, currentLUT[:, 1] * 255, 'g.', markersize=2.0)
pyplot.plot(r256, currentLUT[:, 2] * 255, 'b.', markersize=2.0)
pyplot.title('Final identity LUT')
pyplot.ylabel("LUT value")
pyplot.xlabel("LUT entry")
pyplot.subplot(1, 3, 3)
deviations = currentLUT - r256/255.0
pyplot.plot(r256, deviations[:, 0], 'r.')
pyplot.plot(r256, deviations[:, 1], 'g.')
pyplot.plot(r256, deviations[:, 2], 'b.')
pyplot.title('LUT deviations from sensible')
pyplot.ylabel("LUT value")
pyplot.xlabel("LUT deviation (multiples of 1024)")
pyplot.savefig("bitsSharpIdentityLUT.pdf")
pyplot.show()
# Some properties for which we need weakref pointers, not std properties
@property
def bits(self):
"""The Bits box to which this config object refers
"""
if self.__dict__.get('bits') is None:
return None
else:
return self.__dict__.get('bits')()
@bits.setter
def bits(self, bits):
self.__dict__['bits'] = weakref.ref(bits)
[docs]
def init():
"""DEPRECATED: we used to initialise Bits++ via the compiled dll
This only ever worked on windows and BitsSharp doesn't need it at all
Note that, by default, Bits++ will perform gamma correction
that you don't want (unless you have the CRS calibration device)
(Recommended that you use the BitsPlusPlus class rather than
calling this directly)
"""
retVal = False
if haveBitsDLL:
try:
retVal = _bits.bitsInit() # returns null if fails?
except Exception:
logging.error('bits.init() barfed!')
return retVal
[docs]
def setVideoMode(videoMode):
"""Set the video mode of the Bits++ (win32 only)
bits8BITPALETTEMODE = 0x00000001 # normal vsg mode
NOGAMMACORRECT = 0x00004000 # No gamma correction mode
GAMMACORRECT = 0x00008000 # Gamma correction mode
VIDEOENCODEDCOMMS = 0x00080000
(Recommended that you use the BitsLUT class rather than
calling this directly)
"""
if haveBitsDLL:
return _bits.bitsSetVideoMode(videoMode)
else:
return 1
[docs]
def reset(noGamma=True):
"""Reset the Bits++ box via the USB cable by initialising again
Allows the option to turn off gamma correction
"""
OK = init()
if noGamma and OK:
setVideoMode(NOGAMMACORRECT)