Rev 412 | Blame | Last modification | View Log | RSS feed
#!/usr/bin/env python### Copyright 2010 TheSeven, benedikt93, Farthen### This file is part of emBIOS.## emBIOS is free software: you can redistribute it and/or# modify it under the terms of the GNU General Public License as# published by the Free Software Foundation, either version 2 of the# License, or (at your option) any later version.## emBIOS is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.# See the GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with emBIOS. If not, see <http://www.gnu.org/licenses/>.##"""Command line interface to communicate with emBIOS devices.Run it without arguments to see usage information."""import sysimport osimport timeimport structimport localefrom functools import wrapsimport libembiosimport libembiosdatafrom misc import Error, Logger, getfuncdoc, gethwnameclass NotImplementedError(Error):passclass ArgumentError(Error):passclass ArgumentTypeError(Error):def __init__(self, expected, seen=False):self.expected = expectedself.seen = seendef __str__(self):if self.seen:return "Expected " + str(self.expected) + " but saw " + str(self.seen)else:return "Expected " + str(self.expected) + ", but saw something else"def usage(errormsg=None, specific=False):"""Prints the usage information.It is auto generated from various places."""logger = Logger()cmddict = Commandline.cmddictdoc = getfuncdoc(cmddict)if not specific:logger.log("Please provide a command and (if needed) parameters as command line arguments\n\n")logger.log("Available commands:\n\n")else:logger.log("\n")for function in sorted(doc.items()):function = function[0]if specific == False or specific == function:logger.log(function + " ", 2)for arg in doc[function]['args']:logger.log("<" + arg + "> ")if doc[function]['kwargs']:for kwarg, kwvalue in doc[function]['kwargs'].iteritems():logger.log("[" + kwarg + " = " + str(kwvalue) + "] ")if doc[function]['varargs']:logger.log("<db1> ... <dbN>")logger.log("\n")if doc[function]['documentation']:logger.log(doc[function]['documentation']+"\n", 4)logger.log("\n")logger.log("\n")if errormsg:logger.error(str(errormsg)+"\n")exit(2)def command(func):"""Decorator for all commands.The decorated function is called with (self, all, other, arguments, ...)"""@wraps(func)def decorator(*args):return func(args[0], *args[1:])func._command = Truedecorator.func = funcreturn decoratordef commandClass(cls):"""Decorator for the class. Sets the self.cmddict of the classto all functions decorated with @command"""cls.cmddict = {}for attr, value in cls.__dict__.iteritems():if getattr(value, 'func', False):if getattr(value.func, '_command', False):cls.cmddict[value.func.__name__] = valuereturn cls@commandClassclass Commandline(object):"""If you want to create a new commandline function you just need tocreate a function with the name of it in this class and decorateit with the decorator @command. If you don't want to call the desiredfunction (wrong arguments etc) just raise ArgumentError with orwithout an error message."""def __init__(self):self.logger = Logger()try:self.embios = libembios.Embios(loglevel = 2)except libembios.DeviceNotFoundError:self.logger.error("No emBIOS device found!")exit(1)self.getinfo("version")def _parsecommand(self, func, args):# adds self to the commandline args.# this is needed because the functions need access to their class.args.insert(0, self)if func in self.cmddict:try:self.cmddict[func](*args)except (ArgumentError, libembios.ArgumentError), e:usage(e, specific=func)except (ArgumentError, libembios.ArgumentError):usage("Syntax Error in function '" + func + "'", specific=func)except ArgumentTypeError, e:usage(e, specific=func)except NotImplementedError:self.logger.error("This function is not implemented yet!")except libembios.DeviceError, e:self.logger.error(str(e))except TypeError, e:# Only act on TypeErrors for the function we called, not on TypeErrors raised by another function.if str(e).split(" ", 1)[0] == func + "()":self.logger.error(usage("Argument Error in '" + func + "': Wrong argument count", specific=func))else:raiseexcept libembios.usb.core.USBError:self.logger.error("There is a problem with the USB connection.")else:usage("No such command")@staticmethoddef _bool(something):"""Converts quite everything into bool."""if type(something) == bool:return somethingelif type(something) == int or type(something) == long:return bool(something)elif type(something == str):truelist = ['true', '1', 't', 'y', 'yes']falselist = ['false', '0', 'f', 'n', 'no']if something.lower() in truelist:return Trueelif something.lower() in falselist:return Falseraise ArgumentTypeError("bool", "'"+str(something)+"'")@staticmethoddef _hexint(something):"""Converts quite everything to a hexadecimal represented integer.This works for default arguments too, because it returnsNone when it found that it got a NoneType object."""if type(something) == int or type(something) == long:return somethingelif type(something) == str:try:return int(something, 16)except ValueError:raise ArgumentTypeError("hexadecimal coded integer", "'"+str(something)+"'")elif type(something) == NoneType:return Noneelse:raise ArgumentTypeError("hexadecimal coded integer", "'"+str(something)+"'")@staticmethoddef _hex(integer):return "0x%x" % integer@commanddef getinfo(self, infotype):"""Get info on the running emBIOS.<infotype> may be either of 'version', 'packetsize', 'usermemrange'."""if infotype == "version":hwtype = gethwname(self.embios.lib.dev.hwtypeid)self.logger.info("Connected to " + \libembiosdata.swtypes[self.embios.lib.dev.swtypeid] + \" v" + str(self.embios.lib.dev.version.majorv) + \"." + str(self.embios.lib.dev.version.minorv) + \"." + str(self.embios.lib.dev.version.patchv) + \" r" + str(self.embios.lib.dev.version.revision) + \" running on " + hwtype + "\n")elif infotype == "packetsize":self.logger.info("Maximum packet sizes: \n command out: " + str(self.embios.lib.dev.packetsizelimit.cout) + \"\n command in: " + str(self.embios.lib.dev.packetsizelimit.cin) + \"\n data in: " + str(self.embios.lib.dev.packetsizelimit.din) + \"\n data out: " + str(self.embios.lib.dev.packetsizelimit.dout))elif infotype == "usermemrange":resp = self.embios.getusermemrange()self.logger.info("The user memory range is " + \self._hex(self.embios.lib.dev.usermem.lower) + \" - " + \self._hex(self.embios.lib.dev.usermem.upper - 1))else:raise ArgumentTypeError("one out of 'version', 'packetsize', 'usermemrange'", infotype)@commanddef reset(self, force=False):"""Resets the device"If [force] is 1, the reset will be forced, otherwise it will be gracefully,which may take some time."""force = self._bool(force)if force: self.logger.info("Resetting forcefully...\n")else: self.logger.info("Resetting...\n")self.embios.reset(force)@commanddef poweroff(self, force=False):"""Powers the device offIf [force] is 1, the poweroff will be forced, otherwise it will be gracefully,which may take some time."""force = self._bool(force)if force: self.logger.info("Powering off forcefully...\n")else: self.logger.info("Powering off...\n")self.embios.poweroff(force)@commanddef uploadfile(self, addr, filename):"""Uploads a file to the device<offset>: the address to upload the file to<filename>: the path to the file"""addr = self._hexint(addr)try:f = open(filename, 'rb')except IOError:raise ArgumentError("File not readable. Does it exist?")self.logger.info("Writing file '" + filename + \"' to memory at " + self._hex(addr) + "...")with f:self.embios.write(addr, f.read())f.close()self.logger.info("done\n")@commanddef downloadfile(self, addr, size, filename):"""Uploads a file to the device<offset>: the address to upload the file to<size>: the number of bytes to be read<filename>: the path to the file"""addr = self._hexint(addr)size = self._hexint(size)try:f = open(filename, 'wb')except IOError:raise ArgumentError("Can not open file for write!")self.logger.info("Reading data from address " + self._hex(addr) + \" with the size " + self._hex(size) + \" to '"+filename+"'...")with f:f.write(self.embios.read(addr, size))f.close()self.logger.info("done\n")@commanddef uploadint(self, addr, integer):"""Uploads a single integer to the device<addr>: the address to upload the integer to<integer>: the integer to upload"""addr = self._hexint(addr)integer = self._hexint(integer)if integer > 0xFFFFFFFF:raise ArgumentError("Specified integer too long")data = struct.pack("I", integer)self.embios.write(addr, data)self.logger.info("Integer '" + self._hex(integer) + \"' written successfully to " + self._hex(addr) + "\n")@commanddef downloadint(self, addr):"""Downloads a single integer from the device and prints it to the console window<addr>: the address to download the integer from"""addr = self._hexint(addr)data = self.embios.read(addr, 4)integer = struct.unpack("I", data)[0]self.logger.info("Integer '" + self._hex(integer) + \"' read from address " + self._hex(addr) + "\n")@commanddef i2cread(self, bus, slave, addr, size):"""Reads data from an I2C device<bus>: the bus index<slave>: the slave address<addr>: the start address on the I2C device<size>: the number of bytes to read"""bus = self._hexint(bus)slave = self._hexint(slave)addr = self._hexint(addr)size = self._hexint(size)data = self.embios.i2cread(bus, slave, addr, size)bytes = struct.unpack("%dB" % len(data), data)self.logger.info("Data read from I2C:\n")for index, byte in enumerate(bytes):self.logger.info("%02X: %02X\n" % (index, byte))@commanddef i2cwrite(self, bus, slave, addr, *args):"""Writes data to an I2C device<bus>: the bus index<slave>: the slave address<addr>: the start address on the I2C device<db1> ... <dbN>: the data in single bytes, encoded in hex,seperated by whitespaces, eg. 37 5A 4F EB"""bus = self._hexint(bus)slave = self._hexint(slave)addr = self._hexint(addr)data = ""for arg in args:data += chr(self._hexint(arg))self.logger.info("Writing data to I2C...\n")self.embios.i2cwrite(bus, slave, addr, data)self.logger.info("done\n")@commanddef console(self):"""Reads data from the USB console continuously"""while True:resp = self.embios.usbcread()self.logger.log(resp.data)time.sleep(0.1 / resp.maxsize * (resp.maxsize - len(resp.data)))@commanddef writeusbconsole(self, *args):"""Writes the string <db1> ... <dbN> to the USB console."""text = ""for word in args:text += word + " "text = text[:-1]self.logger.info("Writing '"+ text +"' to the usb console\n")self.embios.usbcwrite(text)@commanddef readdevconsole(self, bitmask):"""Reads data continuously from one or more of the device's consoles.<bitmask>: the bitmask of the consoles to read from."""bitmask = self._hexint(bitmask)while True:resp = self.embios.cread()self.logger.log(resp.data)time.sleep(0.1 / resp.maxsize * (resp.maxsize - len(resp.data)))@commanddef writedevconsole(self, bitmask, *args):"""Writes the string <db1> ... <dbN> to one or more of the device's consoles.<bitmask>: the bitmask of the consoles to write to"""bitmask = self._hexint(bitmask)text = ""for word in args:text += word + " "text = text[:-1]self.logger.info("Writing '" + text + \"' to the device consoles identified with " + self._hex(bitmask) + "\n")self.embios.cwrite(text, bitmask)@commanddef flushconsolebuffers(self, bitmask):"""flushes one or more of the device consoles' buffers.<bitmask>: the bitmask of the consoles to be flushed"""bitmask = self._hexint(bitmask)self.logger.info("Flushing consoles identified with the bitmask " + \self._hex(bitmask) + "\n")self.embios.cflush(bitmask)@commanddef getprocinfo(self):"""Fetches data on the currently running processes"""import datetimethreads = self.embios.getprocinfo()threadload = 0idleload = 0for thread in threads:if thread.id != 0:threadload += thread.cpuload / 255.else:idleload += thread.cpuload / 255.coreload = 1 - (threadload + idleload)cpuload = threadload + coreloadself.logger.info("Threads: %d, CPU load: %.1f%%, kernel load: %.1f%%, user load: %.1f%%\n\n"% (len(threads), cpuload * 100, coreload * 100, threadload * 100))self.logger.info("Thread dump:\n")for thread in threads:self.logger.info(thread.name+":\n", 2)self.logger.info("Thread id: " + str(thread.id)+"\n", 4)self.logger.info("Thread type: " + thread.type+"\n", 4)self.logger.info("Thread state: " + thread.state+"\n", 4)self.logger.info("Block type: " + thread.block_type+"\n", 4)self.logger.info("Blocked by: " + self._hex(thread.blocked_by_ptr)+"\n", 4)self.logger.info("Priority: " + str(thread.priority)+"/255\n", 4)self.logger.info("Current CPU load: %.1f%%\n" % ((thread.cpuload * 100) / 255.), 4)self.logger.info("CPU time (total): "+str(datetime.timedelta(microseconds = thread.cputime_total))+"\n", 4)self.logger.info("Stack address: " + self._hex(thread.stackaddr)+"\n", 4)self.logger.info("Registers:\n", 4)for registerrange in range(4):self.logger.info(" ")for register in range(registerrange, 16, 4):registerrepr = "r"+str(register)self.logger.info("{0:3s}: 0x{1:08X} ".format(registerrepr, thread.regs["r"+str(register)]))self.logger.info("\n")self.logger.info("cpsr: 0x{0:08X}".format(thread.regs.cpsr), 6)self.logger.info("\n")@commanddef lockscheduler(self):"""Locks (freezes) the scheduler"""self.logger.info("Will now lock scheduler\n")self.embios.lockscheduler()@commanddef unlockscheduler(self):"""Unlocks (unfreezes) the scheduler"""self.logger.info("Will now unlock scheduler\n")self.embios.unlockscheduler()@commanddef suspendthread(self, threadid):"""Suspends/resumes the thread with thread ID <threadid>"""threadid = self._hexint(threadid)self.logger.info("Suspending the thread with the threadid "+self._hex(threadid)+"\n")self.embios.suspendthread(threadid)@commanddef resumethread(self, threadid):"""Resumes the thread with thread ID <threadid>"""threadid = self._hexint(threadid)self.logger.info("Resuming the thread with the threadid "+self._hex(threadid)+"\n")self.embios.resumethread(threadid)@commanddef killthread(self, threadid):"""Kills the thread with thread ID <threadid>"""threadid = self._hexint(threadid)self.logger.info("Killing the thread with the threadid " + self._hex(threadid) + "\n")self.embios.killthread(threadid)@commanddef createthread(self, nameptr, entrypoint, stackptr, stacksize, threadtype, priority, state):"""Creates a new thread and returns its thread ID<namepointer>: a pointer to the thread's name<entrypoint>: a pointer to the entrypoint of the thread<stackpointer>: a pointer to the stack of the thread<stacksize>: the size of the thread's stack<type>: the thread type, vaild are: 0 => user thread, 1 => system thread<priority>: the priority of the thread, from 1 to 255<state>: the thread's initial state, valid are: 1 => ready, 0 => suspended"""nameptr = self._hexint(nameptr)entrypoint = self._hexint(entrypoint)stackpointer = self._hexint(stackpointer)stacksize = self._hexint(stacksize)priority = self._hexint(priority)data = self.embios.createthread(nameptr, entrypoint, stackptr, stacksize, type, priority, state)name = self.embios.readstring(nameptr)self.logger.info("Created a thread with the threadid " + data.id + \", the name \"" + name + "\"" + \", the entrypoint at " + self._hex(entrypoint) + \", the stack at " + self._hex(stackpointer) + \" with a size of " + self._hex(stacksize) + \" and a priority of " + self._hex(priority) + "\n")@commanddef run(self, filename):"""Uploads the emBIOS application <filename> tothe memory and executes it"""try:f = open(filename, 'rb')except IOError:raise ArgumentError("File not readable. Does it exist?")with f:data = self.embios.run(f.read())self.logger.info("Executed emBIOS application \"" + data.name + "\" at address " + self._hex(data.baseaddr))@commanddef execimage(self, addr):"""Executes the emBIOS application at <addr>."""addr = self._hexint(addr)self.logger.info("Starting emBIOS app at "+self._hex(addr)+"\n")self.embios.execimage(addr)@commanddef flushcaches(self):"""Flushes the CPUs data and instruction caches."""self.logger.info("Flushing CPU data and instruction caches...")self.embios.flushcaches()self.logger.info("done\n")@commanddef readbootflash(self, addr_flash, addr_mem, size):"""Reads <size> bytes from bootflash to memory.<addr_bootflsh>: the address in bootflash to read from<addr_mem>: the address in memory to copy the data to"""addr_flash = self._hexint(addr_flash)addr_mem = self._hexint(addr_mem)size = self._hexint(size)self.logger.info("Dumping boot flash addresses "+self._hex(addr_flash)+" - "+hex(addr_flash+size)+" to "+self._hex(addr_mem)+" - "+self._hex(addr_mem+size)+"\n")self.embios.bootflashread(addr_mem, addr_flash, size)@commanddef writebootflash(self, addr_flash, addr_mem, size, force=False):"""Writes <size> bytes from memory to bootflash.ATTENTION: Don't call this unless you really know what you're doing!This may BRICK your device (unless it has a good recovery option)<addr_mem>: the address in memory to copy the data from<addr_bootflsh>: the address in bootflash to write to[force]: Use this flag to suppress the 5 seconds delay"""addr_flash = self._hexint(addr_flash)addr_mem = self._hexint(addr_mem)size = self._hexint(size)force = self._bool(force)self.logger.warn("Writing boot flash from the memory in "+self._hex(addr_mem)+" - "+hex(addr_mem+size)+" to "+self._hex(addr_flash)+" - "+self._hex(addr_flash+size)+"\n")if force == False:self.logger.warn("If this was not what you intended press Ctrl-C NOW")for i in range(10):self.logger.info(".")time.sleep(1)self.logger.info("\n")self.embios.bootflashwrite(addr_mem, addr_flash, size)@commanddef runfirmware(self, addr, filename):"""Uploads the firmware in <filename>to the address at <addr> and executes it."""addr = self._hexint(addr)self.uploadfile(addr, filename)self.execfirmware(addr)@commanddef execfirmware(self, addr):"""Executes the firmware at <addr>"""addr = self._hexint(addr)self.logger.info("Running firmware at "+self._hex(addr)+". Bye.")self.embios.execfirmware(addr)@commanddef aesencrypt(self, addr, size, keyindex):"""Encrypts a buffer using a hardware key<addr>: the starting address of the buffer<size>: the size of the buffer<keyindex>: the index of the key in the crypto unit"""addr = self._hexint(addr)size = self._hexint(size)keyindex = self._hexint(keyindex)self.embios.aesencrypt(addr, size, keyindex)@commanddef aesdecrypt(self, addr, size, keyindex):"""Decrypts a buffer using a hardware key<addr>: the starting address of the buffer<size>: the size of the buffer<keyindex>: the index of the key in the crypto unit"""addr = self._hexint(addr)size = self._hexint(size)keyindex = self._hexint(keyindex)self.embios.aesdecrypt(addr, size, keyindex)@commanddef hmac_sha1(self, addr, size, destination):"""Generates a HMAC-SHA1 hash of the buffer<addr>: the starting address of the buffer<size>: the size of the buffer<destination>: the location where the key will be stored"""addr = self._hexint(addr)size = self._hexint(size)destination = self._hexint(destination)sha1size = 0x14self.logger.info("Generating hmac-sha1 hash from the buffer at " + self._hex(addr) + \" with the size " + self._hex(size) + " and saving it to " + \self._hex(destination) + " - " + self._hex(destination+sha1size) + "...")self.embios.hmac_sha1(addr, size, destination)self.logger.info("done\n")data = self.embios.read(destination, sha1size)hash = ord(data)self.logger.info("The generated hash is "+self._hex(hash))@commanddef ipodnano2g_getnandinfo(self):"""Target-specific function: ipodnano2gGathers some information about the NAND chip used"""data = self.embios.ipodnano2g_getnandinfo()self.logger.info("NAND chip type: " + self._hex(data["type"])+"\n")self.logger.info("Number of banks: " + str(data["banks"])+"\n")self.logger.info("Number of blocks: " + str(data["blocks"])+"\n")self.logger.info("Number of user blocks: " + str(data["userblocks"])+"\n")self.logger.info("Pages per block: " + str(data["pagesperblock"]))@commanddef ipodnano2g_nandread(self, addr, start, count, doecc=True, checkempty=True):"""Target-specific function: ipodnano2gReads data from the NAND chip into memory<addr>: the memory location where the data is written to<start>: start block<count>: block count[doecc]: use ecc error correction data[checkempty]: set statusflags if pages are empty"""addr = self._hexint(addr)start = self._hexint(start)count = self._hexint(count)doecc = self._bool(doecc)checkempty = self._bool(checkempty)self.logger.info("Reading " + self._hex(count) + " NAND pages starting at " + \self._hex(start) + " to " + self._hex(addr) + "...")self.embios.ipodnano2g_nandread(addr, start, count, doecc, checkempty)self.logger.info("done\n")@commanddef ipodnano2g_nandwrite(self, addr, start, count, doecc=True):"""Target-specific function: ipodnano2gWrites data to the NAND chip<addr>: the memory location where the data is read from<start>: start block<count>: block count[doecc]: create ecc error correction data"""addr = self._hexint(addr)start = self._hexint(start)count = self._hexint(count)doecc = self._bool(doecc)self.logger.info("Writing " + self._hex(count) + " NAND pages starting at " + \self._hex(start) + " from " + self._hex(addr) + "...")self.embios.ipodnano2g_nandwrite(addr, start, count, doecc)self.logger.info("done\n")@commanddef ipodnano2g_nanderase(self, addr, start, count):"""Target-specific function: ipodnano2gErases blocks on the NAND chip and stores the results to memory<addr>: the memory location where the results are stored<start>: start block<count>: block count"""addr = self._hexint(addr)start = self._hexint(start)count = self._hexint(count)self.logger.info("Erasing " + self._hex(count) + " NAND blocks starting at " + \self._hex(start) + " and logging to " + self._hex(addr) + "...")self.embios.ipodnano2g_nanderase(addr, start, count)self.logger.info("done\n")@commanddef ipodnano2g_dumpnand(self, filenameprefix):"""Target-specific function: ipodnano2gDumps the whole NAND chip to four files<filenameprefix>: prefix of the files that will be created"""info = self.embios.ipodnano2g_getnandinfo()self.logger.info("Dumping NAND contents...")try:infofile = open(filenameprefix+"_info.txt", 'wb')datafile = open(filenameprefix+"_data.bin", 'wb')sparefile = open(filenameprefix+"_spare.bin", 'wb')statusfile = open(filenameprefix+"_status.bin", 'wb')except IOError:raise ArgumentError("Can not open file for writing!")infofile.write("NAND chip type: " + self._hex(info["type"]) + "\r\n")infofile.write("Number of banks: " + str(info["banks"]) + "\r\n")infofile.write("Number of blocks: " + str(info["blocks"]) + "\r\n")infofile.write("Number of user blocks: "+ str(info["userblocks"]) + "\r\n")infofile.write("Pages per block: " + str(info["pagesperblock"]) + "\r\n")for i in range(info["banks"] * info["blocks"] * info["pagesperblock"] / 8192):self.logger.info(".")self.embios.ipodnano2g_nandread(0x08000000, i * 8192, 8192, 1, 1)datafile.write(self.embios.read(0x08000000, 0x01000000))sparefile.write(self.embios.read(0x09000000, 0x00080000))statusfile.write(self.embios.read(0x09080000, 0x00008000))infofile.close()datafile.close()sparefile.close()statusfile.close()self.logger.info("done\n")@commanddef ipodnano2g_wipenand(self, filename, force=False):"""Target-specific function: ipodnano2gWipes the whole NAND chip and logs the result to a file<filename>: location of the log file[force]: use this flag to suppress the 5 seconds delay"""self.logger.warn("Wiping the whole NAND chip!\n")if force == False:self.logger.warn("If this was not what you intended press Ctrl-C NOW")for i in range(10):self.logger.info(".")time.sleep(1)self.logger.info("\n")info = self.embios.ipodnano2g_getnandinfo()self.logger.info("Wiping NAND contents...")try:statusfile = open(filename, 'wb')except IOError:raise ArgumentError("Can not open file for writing!")for i in range(info["banks"] * info["blocks"] / 64):self.logger.info(".")self.embios.ipodnano2g_nanderase(0x08000000, i * 64, 64)statusfile.write(self.embios.read(0x08000000, 0x00000100))statusfile.close()self.logger.info("done\n")@commanddef ipodclassic_writebbt(self, tempaddr, filename):"""Target-specific function: ipodclassicUploads the bad block table <filename> tomemory at <tempaddr> and writes it to the hard disk"""tempaddr = self._hexint(tempaddr)try:f = open(filename, 'rb')except IOError:raise ArgumentError("File not readable. Does it exist?")self.logger.info("Writing bad block table to disk...")data = self.embios.ipodclassic_writebbt(f.read(), tempaddr)f.close()self.logger.info(" done\n")@commanddef getvolumeinfo(self, volume):"""Gathers some information about a storage volume used<volume>: volume id"""volume = self._hexint(volume)data = self.embios.storage_get_info(volume)self.logger.info("Sector size: "+str(data["sectorsize"])+"\n")self.logger.info("Number of sectors: "+str(data["numsectors"])+"\n")self.logger.info("Vendor: "+data["vendor"]+"\n")self.logger.info("Product: "+data["product"]+"\n")self.logger.info("Revision: "+data["revision"])@commanddef readrawstorage(self, volume, sector, count, addr):"""Reads <count> sectors starting at <sector> from storage <volume> to memory at <addr>."""volume = self._hexint(volume)sector = self._hexint(sector)count = self._hexint(count)addr = self._hexint(addr)self.logger.info("Reading volume %s sectors %X - %X to %08X..." % (volume, sector, sector + count - 1, addr))self.embios.storage_read_sectors_md(volume, sector, count, addr)self.logger.info("done\n")@commanddef writerawstorage(self, volume, sector, count, addr):"""Writes memory contents at <addr> to <count> sectors starting at <sector> on storage <volume>."""volume = self._hexint(volume)sector = self._hexint(sector)count = self._hexint(count)addr = self._hexint(addr)self.logger.info("Writing %08X to volume %s sectors %X - %X..." % (addr, volume, sector, sector + count - 1))self.embios.storage_write_sectors_md(volume, sector, count, addr)self.logger.info("done\n")@commanddef readrawstoragefile(self, volume, sector, count, file, buffer = False, buffsize = "100000"):"""Reads <count> sectors starting at <sector> from storage <volume> to file <file>,buffering them in memory at [buffer] in chunks of [buffsize] bytes (both optional)."""volume = self._hexint(volume)sector = self._hexint(sector)count = self._hexint(count)if buffer == False: buffer = self.embios.lib.dev.usermem.lowerelse: buffer = self._hexint(buffer)buffsize = self._hexint(buffsize)try:f = open(file, 'wb')except IOError:raise ArgumentError("Could not open local file for writing.")self.logger.info("Reading volume %s sectors %X - %X to %s..." % (volume, sector, sector + count - 1, file))storageinfo = self.embios.storage_get_info(volume)while count > 0:sectors = min(count, int(buffsize / storageinfo.sectorsize))self.embios.storage_read_sectors_md(volume, sector, sectors, buffer)f.write(self.embios.read(buffer, storageinfo.sectorsize * sectors))sector = sector + sectorscount = count - sectorsf.close()self.logger.info("done\n")@commanddef writerawstoragefile(self, volume, sector, count, file, buffer = False, buffsize = "100000"):"""Writes contents of <file> to <count> sectors starting at <sector> on storage <volume>,buffering them in memory at [buffer] in chunks of [buffsize] bytes (both optional)."""volume = self._hexint(volume)sector = self._hexint(sector)count = self._hexint(count)if buffer == False: buffer = self.embios.lib.dev.usermem.lowerelse: buffer = self._hexint(buffer)buffsize = self._hexint(buffsize)try:f = open(file, 'rb')except IOError:raise ArgumentError("Could not open local file for reading.")self.logger.info("Writing %s to volume %s sectors %X - %X..." % (file, volume, sector, sector + count - 1))storageinfo = self.embios.storage_get_info(volume)while count > 0:sectors = min(count, int(buffsize / storageinfo.sectorsize))bytes = storageinfo.sectorsize * sectorsdata = f.read(bytes)if len(data) == 0: breakwhile len(data) < bytes: data = data + f.read(bytes - len(data))self.embios.write(buffer, data)self.embios.storage_write_sectors_md(volume, sector, sectors, buffer)sector = sector + sectorscount = count - sectorsf.close()self.logger.info("done\n")@commanddef mkdir(self, dirname):"""Creates a directory with the name <dirname>"""self.logger.info("Creating directory " + dirname + "...")self.embios.dir_create(dirname)self.logger.info(" done\n")@commanddef rmdir(self, dirname):"""Removes an empty directory with the name <dirname>"""self.logger.info("Removing directory " + dirname + "...")self.embios.dir_remove(dirname)self.logger.info(" done\n")@commanddef rm(self, filename):"""Removes a file with the name <filename>"""self.logger.info("Removing file " + filename + "...")self.embios.file_unlink(filename)self.logger.info(" done\n")@commanddef rmtree(self, path):"""Recursively removes a folder<path>: the folder to be removed"""handle = self.embios.dir_open(path)while True:try:entry = self.embios.dir_read(handle)if entry.name == "." or entry.name == "..": continueelif entry.attributes & 0x10:self.rmtree(path + "/" + entry.name)else: self.rm(path + "/" + entry.name)except: breakself.embios.dir_close(handle)self.rmdir(path)@commanddef mv(self, oldname, newname):"""Renames or moves file or directory <oldname> to <newname>"""self.logger.info("Renaming " + oldname + " to " + newname + "...")self.embios.file_rename(oldname, newname)self.logger.info(" done\n")@commanddef get(self, remotename, localname, buffer = False, buffsize = "10000"):"""Downloads a file<remotename>: filename on the device<localname>: filename on the computer[buffer]: buffer address (optional)[buffsize]: buffer size (optional)"""if buffer == False: buffer = self.embios.lib.dev.usermem.lowerelse: buffer = self._hexint(buffer)buffsize = self._hexint(buffsize)try:f = open(localname, 'wb')except IOError:raise ArgumentError("Could not open local file for writing.")self.logger.info("Downloading file " + remotename + " to " + localname + "...")fd = self.embios.file_open(remotename, 0)size = self.embios.file_size(fd)while size > 0:bytes = self.embios.file_read(fd, buffer, buffsize)f.write(self.embios.read(buffer, bytes))size = size - bytesself.embios.file_close(fd)f.close()self.logger.info(" done\n")@commanddef gettree(self, remotepath, localpath, buffer = False, buffsize = "10000"):"""Downloads a directory tree<remotepath>: path on the device<localpath>: path on the computer[buffer]: buffer address (optional)[buffsize]: buffer size (optional)"""if buffer == False: buffer = self.embios.lib.dev.usermem.lowerelse: buffer = self._hexint(buffer)buffsize = self._hexint(buffsize)try: os.mkdir(localpath)except: passhandle = self.embios.dir_open(remotepath)while True:try:entry = self.embios.dir_read(handle)if entry.name == "." or entry.name == "..": continueelif entry.attributes & 0x10:self.gettree(remotepath + "/" + entry.name, localpath + "/" + entry.name, buffer, buffsize)else: self.get(remotepath + "/" + entry.name, localpath + "/" + entry.name, buffer, buffsize)except: breakself.embios.dir_close(handle)@commanddef put(self, localname, remotename, buffer = False, buffsize = "10000"):"""Uploads a file<localname>: filename on the computer<remotename>: filename on the device[buffer]: buffer address (optional)[buffsize]: buffer size (optional)"""if buffer == False: buffer = self.embios.lib.dev.usermem.lowerelse: buffer = self._hexint(buffer)buffsize = self._hexint(buffsize)try:f = open(localname, 'rb')except IOError:raise ArgumentError("Could not open local file for reading.")self.logger.info("Uploading file " + localname + " to " + remotename + "...")fd = self.embios.file_open(remotename, 0x15)while True:data = f.read(buffsize)if len(data) == 0: breakself.embios.write(buffer, data)bytes = 0while bytes < len(data):bytes = bytes + self.embios.file_write(fd, buffer + bytes, len(data) - bytes)self.embios.file_close(fd)f.close()self.logger.info(" done\n")@commanddef puttree(self, localpath, remotepath, buffer = False, buffsize = "10000"):"""Uploads a directory tree<localpath>: path on the computer<remotepath>: path on the device[buffer]: buffer address (optional)[buffsize]: buffer size (optional)"""if buffer == False: buffer = self.embios.lib.dev.usermem.lowerelse: buffer = self._hexint(buffer)buffsize = self._hexint(buffsize)try: self.mkdir(remotepath)except: self.logger.info(" failed\n")pathlen = len(localpath)for d in os.walk(localpath):prefix = remotepath + "/" + d[0].replace("\\", "/")[pathlen:] + "/"for dir in d[1]:if dir != ".svn":try: self.mkdir(prefix + dir)except: self.logger.info(" failed\n")for f in d[2]:if not prefix.find("/.svn/") > -1:self.put(d[0] + "/" + f, prefix + f, buffer, buffsize)@commanddef ls(self, path = "/"):"""Lists all files in the specified path[path]: the path which is listed"""handle = self.embios.dir_open(path)self.logger.info("Directory listing of " + path + ":\n")while True:try:entry = self.embios.dir_read(handle)if entry.attributes & 0x10: size = "DIR"else: size = locale.format("%d", entry.size, True).rjust(13)self.logger.info(entry.name.ljust(50) + " - " + size + "\n")except: breakself.embios.dir_close(handle)if __name__ == "__main__":if len(sys.argv) < 2:usage("No command specified")try:interface = Commandline()interface._parsecommand(sys.argv[1], sys.argv[2:])except KeyboardInterrupt:sys.exit()