Rev 139 | Blame | Last modification | View Log | RSS feed
#!/usr/bin/env python### Copyright 2010 TheSeven, benedikt93, Farthen### This file is part of emBIOS.## emBIOS is free software: you can redistribute it and/or# modify it under the terms of the GNU General Public License as# published by the Free Software Foundation, either version 2 of the# License, or (at your option) any later version.## emBIOS is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.# See the GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with emBIOS. If not, see <http://www.gnu.org/licenses/>.##import sysimport structimport usb.coreimport libembiosdataclass Error(Exception):def __init__(self, value=None):self.value = valuedef __str__(self):if self.value != None:return repr(self.value)class ArgumentError(Error):passclass DeviceNotFoundError(Error):passclass DeviceError(Error):passclass SendError(Error):passclass ReceiveError(Error):passclass Bunch(dict):"""This is a dict whose items can also be accessed withbunchinstance.something."""def __init__(self, **kw):dict.__init__(self, kw)self.__dict__ = selfdef __getstate__(self):return selfdef __setstate__(self, state):self.update(state)self.__dict__ = selfclass Embios(object):def __init__(self):self.lib = Lib(self)@staticmethoddef _alignsplit(addr, size, blksize, align):end = addr + sizeif addr & (align - 1):bodyaddr = (addr + min(size, blksize)) & ~(align - 1)else: bodyaddr = addrheadsize = bodyaddr - addrif (size - headsize) & (align - 1):tailaddr = (end - min(end - bodyaddr, blksize) + align - 1) & ~(align - 1)else: tailaddr = endtailsize = end - tailaddrreturn (headsize, tailaddr - bodyaddr, tailsize)def getversioninfo(self):""" This returns the emBIOS version and device information. """return self.lib.monitorcommand(struct.pack("IIII", 1, 0, 0, 0), "IBBBBI", ("revision", "majorv", "minorv", "patchv", "swtypeid", "hwtypeid"))def getpacketsizeinfo(self):""" This returns the emBIOS max packet size information.It also sets the properties of the device object accordingly."""resp = self.lib.monitorcommand(struct.pack("IIII", 1, 1, 0, 0), "HHII", ("coutmax", "cinmax", "doutmax", "dinmax"))self.lib.dev.packetsizelimit['cout'] = resp.coutmaxself.lib.dev.packetsizelimit['cin'] = resp.cinmaxself.lib.dev.packetsizelimit['din'] = resp.dinmaxself.lib.dev.packetsizelimit['dout'] = resp.doutmaxreturn respdef getusermemrange(self):""" This returns the memory range the user has access to. """return self.lib.monitorcommand(struct.pack("IIII", 1, 2, 0, 0), "III", ("lower", "upper", None))def reset(self, force=False):""" Reboot the device """if force:return self.lib.monitorcommand(struct.pack("IIII", 2, 0, 0, 0))else:return self.lib.monitorcommand(struct.pack("IIII", 2, 1, 0, 0), "III", (None, None, None))def poweroff(self, force=False):""" Powers the device off. """if force:return self.lib.monitorcommand(struct.pack("IIII", 3, 0, 0, 0))else:return self.lib.monitorcommand(struct.pack("IIII", 3, 1, 0, 0), "III", (None, None, None))def read(self, addr, size):""" Reads the memory from location 'addr' with size 'size'from the device. This cares about too long packagesand decides whether to use DMA or not."""if not self.lib.connected:self.lib.connect()cin_maxsize = self.lib.dev.packetsizelimit["cin"] - 0x10din_maxsize = self.lib.dev.packetsizelimit["din"]data = ""(headsize, bodysize, tailsize) = self._alignsplit(addr, size, cin_maxsize, 16)if headsize != 0:data += self.readmem(addr, headsize)addr += headsizewhile bodysize > 0:if bodysize >= 2 * cin_maxsize:readsize = min(bodysize, din_maxsize)data += self.readdma(addr, readsize)else:readsize = min(bodysize, cin_maxsize)data += self.readmem(addr, readsize)addr += readsizebodysize -= readsizeif tailsize != 0:data += self.readmem(addr, tailsize)return datadef write(self, addr, data):""" Writes the data in 'data' to the location 'addr'in the memory of the device. This cares about too long packagesand decides whether to use DMA or not."""if not self.lib.connected:self.lib.connect()cout_maxsize = self.lib.dev.packetsizelimit["cout"] - 0x10dout_maxsize = self.lib.dev.packetsizelimit["dout"](headsize, bodysize, tailsize) = self._alignsplit(addr, len(data), cout_maxsize, 16)offset = 0if headsize != 0:self.writemem(addr, headsize)offset += headsizeaddr += headsizewhile bodysize > 0:if bodysize >= 2 * cout_maxsize:writesize = min(bodysize, dout_maxsize)self.writedma(addr, data[offset:offset+writesize])else:writesize = min(bodysize, cout_maxsize)self.writemem(addr, data[offset:offset+writesize])offset += writesizeaddr += writesizebodysize -= writesizeif tailsize != 0:self.writemem(addr, data[offset:offset+tailsize])return datadef readmem(self, addr, size):""" Reads the memory from location 'addr' with size 'size'from the device."""resp = self.lib.monitorcommand(struct.pack("IIII", 4, addr, size, 0), "III%ds" % size, (None, None, None, "data"))return resp.datadef writemem(self, addr, data):""" Writes the data in 'data' to the location 'addr'in the memory of the device."""return self.lib.monitorcommand(struct.pack("IIII%ds" % len(data), 5, addr, len(data), 0, data), "III", (None, None, None))def readdma(self, addr, size):""" Reads the memory from location 'addr' with size 'size'from the device. This uses DMA and the data in endpoint."""self.lib.monitorcommand(struct.pack("IIII", 6, addr, size, 0), "III", (None, None, None))return struct.unpack("%ds" % size, self.lib.dev.din(size))[0]def writedma(self, addr, data):""" Writes the data in 'data' to the location 'addr'in the memory of the device. This uses DMA and the data out endpoint."""self.lib.monitorcommand(struct.pack("IIII", 7, addr, len(data), 0), "III", (None, None, None))return self.lib.dev.dout(data)def i2cread(self, index, slaveaddr, startaddr, size):""" Reads data from an i2c slave """def i2cwrite(self, index, slaveaddr, startaddr, data):""" Writes data to an i2c slave """def usbcread(self, size):""" Reads data with size 'size' from the USB console """def usbcwrite(self, data):""" Writes data to the USB console """def cread(self, size, bitmask):""" Reads data with the specified size from the device consolesidentified with the specified bitmask"""def cwrite(self, data):""" Writes data to the device consolesidentified with the specified bitmask."""def cflush(self, bitmask):""" Flushes the consoles specified with 'bitmask' """return self.lib.monitorcommand(struct.pack("IIII", 14, bitmask, 0, 0), "III", (None, None, None))def getprocinfo(self, offset, size):""" Gets current state of the scheduler """def freezescheduler(self, freeze=True):""" Freezes/Unfreezes the scheduler """return self.lib.monitorcommand(struct.pack("IIII", 16, 1 if freeze else 0, 0, 0), "III", ("before", None, None))def unfreezescheduler(self):""" Unfreezes the scheduler """return self.lib.monitorcommand(struct.pack("IIII", 16, 0, 0, 0), "III", ("before", None, None))def suspendthread(self, id, suspend=True):""" Suspends the thread with the specified id """return self.lib.monitorcommand(struct.pack("IIII", 17, 1 if suspend else 0, id, 0), "III", ("before", None, None))def unsuspendthread(self, id):""" Suspends the thread with the specified id """return self.lib.monitorcommand(struct.pack("IIII", 17, 0, id, 0), "III", ("before", None, None))def killthread(self, id):""" Kills the thread with the specified id """return self.lib.monitorcommand(struct.pack("IIII", 18, id, 0, 0), "III", ("before", None, None))def createthread(self, nameptr, entrypoint, stackptr, stacksize, threadtype, priority, state):""" Creates a thread with the specified attributes """if threadtype == "user":threadtype = 0elif threadtype == "system":threadtype = 1else:raise SyntaxError("Threadtype must be either 'system' or 'user'")if priority > 256 or priority < 0:raise SyntaxError("Priority must be a number between 0 and 256")if state == "ready":state = 0elif state == "suspended":state = 1else:raise SyntaxError("State must be either 'ready' or 'suspended'")resp = self.lib.monitorcommand(struct.pack("IIIIIIII", 19, nameptr, entrypoint, stackptr, stacksize, threadtype, priority, state), "III", (id, None, None))if resp.id < 0:raise DeviceError("The device returned the error code "+str(resp.id))return respdef flushcpucache(self):""" Flushes the CPU instruction and data cache """return self.lib.monitorcommand(struct.pack("IIII", 20, 0, 0, 0), "III", (None, None, None))def run(self, addr):""" Runs the emBIOS app at 'addr' """return self.lib.monitorcommand(struct.pack("IIII", 21, addr, 0, 0), "III", ("excecimage", None, None))def bootflashread(self, memaddr, flashaddr, size):""" Copies the data in the bootflash at 'flashaddr' of the specified sizeto the memory at addr 'memaddr'"""def bootflashwrite(self, memaddr, flashaddr, size):""" Copies the data in the memory at 'memaddr' of the specified sizeto the boot flash at addr 'flashaddr'"""def execfirmware(self, addr):""" Executes the firmware at 'addr' and passes all control to it. """return self.lib.monitorcommand(struct.pack("IIII", 24, addr, 0, 0), "III", (None, None, None))def aesencrypt(self, addr, size, keyindex):""" Encrypts the buffer at 'addr' with the specified sizewith the hardware AES key index 'keyindex'"""return self.lib.monitorcommand(struct.pack("IBBHII", 25, 1, 0, keyindex, addr, size), "III", (None, None, None))def aesdecrypt(self, addr, size, keyindex):""" Decrypts the buffer at 'addr' with the specified sizewith the hardware AES key index 'keyindex'"""return self.lib.monitorcommand(struct.pack("IBBHII", 25, 0, 0, keyindex, addr, size), "III", (None, None, None))def hmac_sha1(self, addr, size, destination):""" Generates a HMAC-SHA1 hash of the buffer and saves it to 'destination' """return self.lib.monitorcommand(struct.pack("IIII", 26, addr, size, destination), "III", (None, None, None))class Lib(object):def __init__(self, embios):self.idVendor = 0xFFFFself.idProduct = 0xE000self.embios = embiosself.connected = Falsedef connect(self):self.dev = Dev(self.idVendor, self.idProduct)self.connected = Trueself.embios.getpacketsizeinfo()def monitorcommand(self, cmd, rcvdatatypes=None, rcvstruct=None):if not self.connected:self.connect()self.dev.cout(cmd)if rcvdatatypes:rcvdatatypes = "I" + rcvdatatypes # add the responsedata = self.dev.cin(struct.calcsize(rcvdatatypes))data = struct.unpack(rcvdatatypes, data)response = data[0]if libembiosdata.responsecodes[response] == "ok":if rcvstruct:datadict = Bunch()counter = 1 # start with 1, 0 is the idfor item in rcvstruct:if item != None: # else the data is undefineddatadict[item] = data[counter]counter += 1return datadictelse:return dataelif libembiosdata.responsecodes[response] == "unsupported":raise DeviceError("The device does not support this command.")elif libembiosdata.responsecodes[response] == "invalid":raise DeviceError("Invalid command! This should NOT happen!")elif libembiosdata.responsecodes[response] == "busy":raise DeviceError("Device busy")class Dev(object):def __init__(self, idVendor, idProduct):self.idVendor = idVendorself.idProduct = idProductself.interface = 0self.timeout = 100self.connect()self.findEndpoints()self.packetsizelimit = {}self.packetsizelimit['cout'] = Noneself.packetsizelimit['cin'] = Noneself.packetsizelimit['dout'] = Noneself.packetsizelimit['din'] = Nonedef __del__(self):self.disconnect()def findEndpoints(self):epcounter = 0self.endpoint = {}for cfg in self.dev:for intf in cfg:for ep in intf:if epcounter == 0:self.endpoint['cout'] = ep.bEndpointAddresselif epcounter == 1:self.endpoint['cin'] = ep.bEndpointAddresselif epcounter == 2:self.endpoint['dout'] = ep.bEndpointAddresselif epcounter == 3:self.endpoint['din'] = ep.bEndpointAddressepcounter += 1if epcounter <= 3:raise DeviceError("Not all endpoints found in the descriptor. Only "+str(epcounter)+" found, we need 4")def connect(self):self.dev = usb.core.find(idVendor=self.idVendor, idProduct=self.idProduct)if self.dev is None:raise DeviceNotFoundError()self.dev.set_configuration()def disconnect(self):passdef send(self, endpoint, data):size = self.dev.write(endpoint, data, self.interface, self.timeout)if size != len(data):raise SendErrorreturn lendef receive(self, endpoint, size):read = self.dev.read(endpoint, size, self.interface, self.timeout)if len(read) != size:raise ReceiveErrorreturn readdef cout(self, data):if self.packetsizelimit['cout'] and len(data) > self.packetsizelimit['cout']:raise SendError("Packet too big")return self.send(self.endpoint['cout'], data)def cin(self, size):if self.packetsizelimit['cin'] and size > self.packetsizelimit['cin']:raise ReceiveError("Packet too big")return self.receive(self.endpoint['cin'], size)def dout(self, data):if self.packetsizelimit['dout'] and len(data) > self.packetsizelimit['dout']:raise SendError("Packet too big")return self.send(self.endpoint['dout'], data)def din(self, size):if self.packetsizelimit['din'] and size > self.packetsizelimit['din']:raise ReceiveError("Packet too big")return self.receive(self.endpoint['din'], size)if __name__ == "__main__":# Some testsimport sysembios = Embios()resp = embios.getversioninfo()sys.stdout.write("Embios device version information: " + libembiosdata.swtypes[resp.swtypeid] + " v" + str(resp.majorv) + "." + str(resp.minorv) +"." + str(resp.patchv) + " r" + str(resp.revision) + " running on " + libembiosdata.hwtypes[resp.hwtypeid] + "\n")resp = embios.getusermemrange()sys.stdout.write("Usermemrange: "+hex(resp.lower)+" - "+hex(resp.upper)+"\n")memaddr = resp.lowermaxlen = resp.upper - resp.lowerf = open("./embios.py", "rb")sys.stdout.write("Loading test file (embios.py) to send over USB...\n")datastr = f.read()[:maxlen]sys.stdout.write("Sending data...\n")embios.write(memaddr, datastr)sys.stdout.write("Encrypting data with the hardware key...\n")embios.aesencrypt(memaddr, len(datastr), 0)sys.stdout.write("Reading data back and saving it to 'libembios-test-encrypted.bin'...\n")f = open("./libembios-test-encrypted.bin", "wb")f.write(embios.read(memaddr, len(datastr)))sys.stdout.write("Decrypting the data again...\n")embios.aesdecrypt(memaddr, len(datastr), 0)sys.stdout.write("Reading data back from device...\n")readdata = embios.read(memaddr, len(datastr))if readdata == datastr:sys.stdout.write("Data matches!")else:sys.stdout.write("Data does NOT match. Something got wrong")