diff --git a/ekgplotter/ekgplotter/main.py b/ekgplotter/ekgplotter/main.py index dbc1ba2..249e540 100644 --- a/ekgplotter/ekgplotter/main.py +++ b/ekgplotter/ekgplotter/main.py @@ -24,62 +24,43 @@ from __future__ import absolute_import - -from datetime import datetime -import threading -import Queue -import traceback -import logging -import numpy as np -import string -import time -import random -import socket -import os.path -from os import curdir, sep from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -from SocketServer import ThreadingMixIn, ForkingMixIn -import select -import re - -from collections import deque - - -from PyQt4.QtCore import QBuffer, QByteArray, QIODevice -from PyQt4 import QtGui, QtCore - -import pyqtgraph as pg - -from pyqtgraph.widgets.PlotWidget import PlotWidget - from chaosc.argparser_groups import * from chaosc.lib import logger, resolve_host - +from collections import deque, defaultdict +from datetime import datetime +from operator import attrgetter +from os import curdir, sep +from PyQt4 import QtGui, QtCore +from PyQt4.QtCore import QBuffer, QByteArray, QIODevice +from SocketServer import ThreadingMixIn, ForkingMixIn +import cPickle +import logging +import numpy as np +import os.path +import pyqtgraph as pg +import Queue +import random +import re +import select +import socket +import string +import threading +import time +import traceback fh = logging.FileHandler(os.path.expanduser("~/.chaosc/ekgplotter.log")) fh.setLevel(logging.DEBUG) logger.addHandler(fh) + try: from chaosc.c_osc_lib import OSCMessage, decode_osc except ImportError as e: - print(e) + logging.exception(e) from chaosc.osc_lib import OSCMessage, decode_osc - -class PlotWindow(PlotWidget): - def __init__(self, title=None, **kargs): - self.win = QtGui.QMainWindow() - PlotWidget.__init__(self, **kargs) - self.win.setCentralWidget(self) - for m in ['resize']: - setattr(self, m, getattr(self.win, m)) - if title is not None: - self.win.setWindowTitle(title) - - - class OSCThread(threading.Thread): def __init__(self, args): super(OSCThread, self).__init__() @@ -94,24 +75,13 @@ class OSCThread(threading.Thread): self.osc_sock.bind(self.client_address) self.osc_sock.setblocking(0) - print "%s: starting up osc receiver on '%s:%d'" % ( + logging.info("%s: starting up osc receiver on '%s:%d'", datetime.now().strftime("%x %X"), self.client_address[0], self.client_address[1]) - #self.subscribe_me() + self.subscribe_me() def subscribe_me(self): - """Use this procedure for a quick'n dirty subscription to your chaosc instance. - - :param chaosc_address: (chaosc_host, chaosc_port) - :type chaosc_address: tuple - - :param receiver_address: (host, port) - :type receiver_address: tuple - - :param token: token to get authorized for subscription - :type token: str - """ - print "%s: subscribing to '%s:%d' with label %r" % (datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1], self.args.subscriber_label) + logging.info("%s: subscribing to '%s:%d' with label %r", datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1], self.args.subscriber_label) msg = OSCMessage("/subscribe") msg.appendTypedArg(self.client_address[0], "s") msg.appendTypedArg(self.client_address[1], "i") @@ -125,7 +95,7 @@ class OSCThread(threading.Thread): if self.args.keep_subscribed: return - print "%s: unsubscribing from '%s:%d'" % (datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1]) + logging.info("%s: unsubscribing from '%s:%d'", datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1]) msg = OSCMessage("/unsubscribe") msg.appendTypedArg(self.client_address[0], "s") msg.appendTypedArg(self.client_address[1], "i") @@ -136,9 +106,9 @@ class OSCThread(threading.Thread): while self.running: try: - reads, writes, errs = select.select([self.osc_sock], [], [], 0.01) + reads, writes, errs = select.select([self.osc_sock], [], [], 0.005) except Exception, e: - print "select error", e + logging.exception(e) pass else: if reads: @@ -147,106 +117,66 @@ class OSCThread(threading.Thread): osc_address, typetags, messages = decode_osc(osc_input, 0, len(osc_input)) queue.put_nowait((osc_address, messages)) except Exception, e: - print "recvfrom error", e - #else: - #queue.put_nowait(("/bjoern/ekg", [0])) - #queue.put_nowait(("/merle/ekg", [0])) - #queue.put_nowait(("/uwe/ekg", [0])) + logging.info(e) - #self.unsubscribe_me() - print "OSCThread is going down" + + self.unsubscribe_me() + self.osc_sock.close() + logging.info("OSCThread is going down") queue = Queue.Queue() class Actor(object): - shadowPen = pg.mkPen(255, 255, 255) - brush = pg.mkBrush("w") - def __init__(self, name, num_data, color): - self.data = [0] * num_data - self.data_pointer = 0 + def __init__(self, name, num_data, color, ix, max_actors, actor_height): self.name = name - self.active = True - self.plotItem = pg.PlotCurveItem(pen=pg.mkPen(color, width=3), name=name) self.num_data = num_data - #self.plotItem.setShadowPen(pen=Actor.shadowPen, width=3, cosmetic=True) - self.plotPoint = pg.ScatterPlotItem(pen=Actor.shadowPen, brush=self.brush, size=5) + self.color = color + self.ix = ix + self.max_actors = max_actors + self.actor_height = actor_height + self.updated = 0 + + self.offset = ix * actor_height + self.data = np.array([self.offset] * num_data) + self.head = 0 + self.pre_head = 0 + self.plotItem = pg.PlotCurveItem(pen=pg.mkPen(color, width=3), name=name) + self.plotPoint = pg.ScatterPlotItem(pen=pg.mkPen("w", width=5), brush=pg.mkBrush(color), size=5) def __str__(self): - return "" % (self.name, self.active, self.data_pointer) + return "" % (self.name, self.active, self.head) __repr__ = __str__ - def scale_data(self, ix, max_items): - scale = 255 / max_items * ix - return [value / max_items + scale for value in self.data] - def set_point(self, value, ix, max_items): - scale = 255 / max_items * ix - self.plotPoint.setData(x = [self.data_pointer], y = [value / max_items + scale]) + def add_value(self, value): + dp = self.head + self.data[dp] = value / self.max_actors + self.offset + self.pre_head = dp + self.head = (dp + 1) % self.num_data + self.updated += 1 - #def find_max_value(self, item_data): - #max_index = -1 - #for ix, i in enumerate(item_data): - #if i > 250: - #return ix, i - #return None, None + def fill_missing(self, count): + dp = self.head + for i in range(count): + self.data[dp] = self.offset + dp = (dp + 1) % self.num_data + self.updated += 1 + self.pre_head = (dp - 1) % self.num_data + self.head = dp - #def rearrange(self, item_data, actual_pos, max_items): - #max_value_index, max_value = find_max_value(item_data) - #if max_value_index is None: - #return actual_pos - #mean = int(max_items / 2.) - #start = mean - max_value_index - #if start != 0: - #item_data.rotate(start) - #pos = (actual_pos + start) % max_items - #else: - #pos = actual_pos - #print "rearrange", mean, start, actual_pos, pos, item_data - #return pos + def render(self): + self.plotItem.setData(y=self.data, clear=True) + self.plotPoint.setData(x=[self.pre_head], y = [self.data[self.pre_head]]) - def set_value(self, value): - self.data[self.data_pointer] = value - self.data_pointer = (self.data_pointer + 1) % self.num_data - - #def resize(item_data, max_length, new_max_length, pos): - #print "resize", max_length, new_max_length - #if new_max_length < 15: - #return max_length, pos - - #if new_max_length > max_length: - #pad = (new_max_length - max_length) - #print "pad", pad - #for i in range(pad): - #if i % 2 == 0: - #item_data.append(0) - #else: - #item_data.appendleft(0) - #pos += 1 - #return new_max_length, pos - #elif new_max_length < max_length: - #pad = (max_length - new_max_length) - #for i in range(pad): - #if i % 2 == 0: - #item_data.pop() - #if pos >= new_max_length: - #pos = 0 - #else: - #item_data.popleft() - #if pos > 0: - #pos -= 1 - #return new_max_length, pos - #return max_length, pos - class EkgPlot(object): def __init__(self, actor_names, num_data, colors): self.plot = pg.PlotWidget() + #self.plot.setConfigOptions(antialias=True) self.plot.hide() - #self.plot.setLabel('left', "

Amplitude

") - #self.plot.setLabel('bottom', "

Time

") self.plot.showGrid(False, False) self.plot.setYRange(0, 255) self.plot.setXRange(0, num_data) @@ -256,14 +186,19 @@ class EkgPlot(object): bl = self.plot.getAxis("left") ba.setTicks([]) bl.setTicks([]) + ba.hide() + bl.hide() self.active_actors = list() self.actors = dict() self.lengths1 = [0] self.num_data = num_data - for actor_name, color in zip(actor_names, colors): - self.add_actor(actor_name, num_data, color) + self.max_value = 255 + self.max_actors = len(actor_names) + self.actor_height = self.max_value / self.max_actors + for ix, (actor_name, color) in enumerate(zip(actor_names, colors)): + self.add_actor(actor_name, num_data, color, ix, self.max_actors, self.actor_height) self.set_positions() @@ -272,8 +207,8 @@ class EkgPlot(object): self.updated_actors = set() - def add_actor(self, actor_name, num_data, color): - actor_obj = Actor(actor_name, num_data, color) + def add_actor(self, actor_name, num_data, color, ix, max_actors, actor_height): + actor_obj = Actor(actor_name, num_data, color, ix, max_actors, actor_height) self.actors[actor_name] = actor_obj self.plot.addItem(actor_obj.plotItem) self.plot.addItem(actor_obj.plotPoint) @@ -282,61 +217,43 @@ class EkgPlot(object): def set_positions(self): for ix, actor_obj in enumerate(self.active_actors): - actor_obj.plotItem.setPos(0, ix * 6) - actor_obj.plotPoint.setPos(0, ix * 6) + actor_obj.plotItem.setPos(0, ix * 2) + actor_obj.plotPoint.setPos(0, ix * 2) def active_actor_count(self): - return len(self.active_actors) + return self.max_actors + + def new_round(self): + for ix, actor in enumerate(self.active_actors): + actor.updated = 0 + + def update_missing_actors(self): + liste = sorted(self.active_actors, key=attrgetter("updated")) + max_values = liste[-1].updated + if max_values == 0: + # handling no signal + for actor in self.active_actors: + actor.add_value(0) + return + for ix, actor in enumerate(self.active_actors): + diff = max_values - actor.updated + if diff > 0: + for i in range(diff): + actor.add_value(0) + def update(self, osc_address, value): - #print "update", osc_address res = self.ekg_regex.match(osc_address) if res: - #print("matched data") actor_name = res.group(1) actor_obj = self.actors[actor_name] - max_actors = len(self.active_actors) - actor_data = actor_obj.data - data_pointer = actor_obj.data_pointer - actor_data[data_pointer] = value - try: - ix = self.active_actors.index(actor_obj) - actor_obj.set_point(value, ix, max_actors) - actor_obj.plotItem.setData(y=np.array(actor_obj.scale_data(ix, max_actors)), clear=True) - except ValueError as e: - #print("data", e) - pass - actor_obj.data_pointer = (data_pointer + 1) % self.num_data - return + actor_obj.add_value(value) - res = self.ctl_regex.match(osc_address) - if res: - print "received cmd", osc_address - actor_name = res.group(1) - actor_obj = self.actors[actor_name] - if value == 1 and not actor_obj.active: - print "actor on", actor_name, self.active_actors - self.plot.addItem(actor_obj.plotItem) - self.plot.addItem(actor_obj.plotPoint) - actor_obj.active = True - if not actor_obj in self.active_actors: - self.active_actors.append(actor_obj) - elif value == 0 and actor_obj.active: - print "actor off", actor_name, self.active_actors - actor_obj.active = False - self.plot.removeItem(actor_obj.plotItem) - self.plot.removeItem(actor_obj.plotPoint) - try: - self.active_actors.remove(actor_obj) - except ValueError as e: - print "active actors error", e, self.active_actors - pass - assert actor_obj not in self.active_actors - else: - print "internal data not in sync", self.active_actors, actor_obj - self.set_positions() + def render(self): + for ix, actor in enumerate(self.active_actors): + actor.render() class MyHandler(BaseHTTPRequestHandler): @@ -348,7 +265,6 @@ class MyHandler(BaseHTTPRequestHandler): if self.path=="" or self.path==None or self.path[:1]==".": self.send_error(403,'Forbidden') - if self.path.endswith(".html"): directory = os.path.dirname(os.path.abspath(__file__)) data = open(os.path.join(directory, self.path), "rb").read() @@ -360,83 +276,66 @@ class MyHandler(BaseHTTPRequestHandler): self.thread = thread = OSCThread(self.server.args) thread.daemon = True thread.start() - actor_names = ["merle", "bjoern", "uwe"] + actor_names = ["bjoern", "uwe", "merle"] num_data = 100 colors = ["r", "g", "b"] qtapp = QtGui.QApplication([]) plotter = EkgPlot(actor_names, num_data, colors) self.send_response(200) - self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=--aaboundary") + self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=--2342") self.end_headers() - #lastTime = time.time() - #fps = None event_loop = QtCore.QEventLoop() + last_frame = time.time() - 1.0 + frame_rate = 13.0 + frame_length = 1. / frame_rate + plotter.new_round() while 1: event_loop.processEvents() qtapp.sendPostedEvents(None, 0) while 1: try: osc_address, args = queue.get_nowait() + plotter.update(osc_address, args[0]) except Queue.Empty: break - else: - plotter.update(osc_address, args[0]) - exporter = pg.exporters.ImageExporter.ImageExporter(plotter.plot.plotItem) - exporter.parameters()['width'] = 768 - img = exporter.export("tmpfile", True) - buffer = QBuffer() - buffer.open(QIODevice.WriteOnly) - img.save(buffer, "JPG") - img.save("/tmp/test2.jpg", "JPG") + now = time.time() + delta = now - last_frame + if delta > frame_length: + plotter.update_missing_actors() + plotter.render() + exporter = pg.exporters.ImageExporter.ImageExporter(plotter.plot.plotItem) + exporter.parameters()['width'] = 768 + img = exporter.export(toBytes=True) + buffer = QBuffer() + buffer.open(QIODevice.WriteOnly) + img.save(buffer, "JPG") + JpegData = buffer.data() + self.wfile.write("--2342\r\nContent-Type: image/jpeg\r\nContent-length: %d\r\n\r\n%s\r\n\r\n\r\n" % (len(JpegData), JpegData)) + last_frame = now + plotter.new_round() + #JpegData = None + #buffer = None + #img = None + #exporter = None + time.sleep(0.01) - JpegData = buffer.data() - self.wfile.write("--aaboundary\r\nContent-Type: image/jpeg\r\nContent-length: %d\r\n\r\n%s\r\n\r\n\r\n" % (len(JpegData), JpegData)) - - JpegData = None - buffer = None - img = None - exporter = None - #now = time.time() - #dt = now - lastTime - #lastTime = now - #if fps is None: - #fps = 1.0/dt - #else: - #s = np.clip(dt*3., 0, 1) - #fps = fps * (1-s) + (1.0/dt) * s - #print '%0.2f fps' % fps - time.sleep(0.05) - - elif self.path.endswith(".jpeg"): - directory = os.path.dirname(os.path.abspath(__file__)) - data = open(os.path.join(directory, self.path), "rb").read() - self.send_response(200) - self.send_header('Content-type','image/jpeg') - self.end_headers() - self.wfile.write(data) - return - except (KeyboardInterrupt, SystemError): - print "queue size", queue.qsize() - if hasattr(self, "thread") and self.thread is not None: - self.thread.running = False - self.thread.join() - self.thread = None + except (KeyboardInterrupt, SystemError), e: + raise e except IOError, e: - print "ioerror", e, e[0] - print dir(e) if e[0] in (32, 104): if hasattr(self, "thread") and self.thread is not None: self.thread.running = False self.thread.join() self.thread = None else: - print '-'*40 - print 'Exception happened during processing of request from' - traceback.print_exc() # XXX But this goes to stderr! - print '-'*40 + logging.info('-'*40) + logging.info('Exception happened during processing of request from') + logging.exception(e) + logging.info('-'*40) self.send_error(404,'File Not Found: %s' % self.path) + raise e class JustAHTTPServer(HTTPServer): @@ -455,23 +354,26 @@ def main(): arg_parser.add_subscriber_group() args = arg_parser.finalize() + if not args.background: + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + logger.addHandler(ch) + + http_host, http_port = resolve_host(args.http_host, args.http_port, args.address_family) server = JustAHTTPServer((http_host, http_port), MyHandler) server.address_family = args.address_family server.args = args - print "%s: starting up http server on '%s:%d'" % ( + logging.info("%s: starting up http server on '%s:%d'", datetime.now().strftime("%x %X"), http_host, http_port) try: server.serve_forever() except KeyboardInterrupt: - print '^C received, shutting down server' + logging.info('^C received, shutting down server') server.socket.close() sys.exit(0) - - if __name__ == '__main__': main() -