| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- #!/usr/bin/python3
- #
- # Copyright (c) 2021 Clementine Computing LLC.
- #
- # This file is part of PopuFare.
- #
- # PopuFare is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # PopuFare is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
- #
- # This is the front end interface to the PIU that will run as
- # a Python application.
- # It displays the video stream and overlays the barcode, if it finds one.
- # It scans in the directory:
- #
- # /home/bus/queue
- #
- # for messages. If found, it then moves them to:
- #
- # /home/bus/log/messages
- #
- # Only the last message (after filename sorting) is chosen.
- # The first line in the message file is the status message.
- # The second line is the error message.
- # Either or both lines can be blank without consequence.
- #
- import sys
- import os
- import tkinter as tk
- from PIL import Image, ImageTk
- import cv2
- import signal
- import tkinter.font as tkfont
- import numpy as np
- from pyzbar import pyzbar
- import datetime
- import imutils
- import time
- from imutils.video.pivideostream import PiVideoStream
- from imutils.video import FPS
- from picamera.array import PiRGBArray
- from picamera import PiCamera
- # signal handlers to capture ctrl-c etc. for "graceful" exit
- #
- def handler(ev):
- root.destroy()
- def check():
- root.after(500, check)
- class MainWindow():
- def __init__(self, window, pistream):
- # variables relating to display messages,
- # how long to display them for, when to
- # remove them from display and what to
- # display when idle.
- #
- self.display_msg = ""
- self.display_msg_error = ""
- self.display_msg_idle = "READY"
- self.display_msg_t = 10000
- self.display_msg_start = 0
- self.display_msg_end = 0
- # log files and locations of communication files
- #
- self.message_dir = "/home/bus/queue"
- self.processed_message_dir = "/home/bus/log/message"
- self.log_dir = "/home/bus/log"
- self.barcode_ofn = os.path.join(self.log_dir, "qrcode.log")
- self.watchdog_ofn = os.path.join(self.log_dir, "piu_app.watchdog")
- # watchdog rate limit variables
- #
- self.watchdog_rate_ms = 2000
- self.watchdog_next = 0
- # our camera stream
- #
- self.pistream = pistream
- pad = 0
- self.disp_pad = pad
- self._orig_geom = "{0}x{1}+0+0".format( window.winfo_screenwidth()-pad, window.winfo_screenheight()-pad )
- self.screen_width = window.winfo_screenwidth()
- self.screen_height = window.winfo_screenheight()
- window.attributes("-fullscreen", True)
- self.cap_width = 400
- self.cap_height = 300
- self.window = window
- # Camera processing and graphical display rate, in milliseconds.
- # This is optimisitic and it looks like Python, or whatever else,
- # is not going to really acheive sub 100ms.
- #
- self.interval = 50
- self.canvas = tk.Canvas(self.window, width=self.screen_width, height=self.screen_height, bg='#000', bd=0, highlightthickness=0, relief='ridge')
- self.canvas.grid(row=0, column=0)
- # variables for bar/qr code scanning.
- # * rate limiting to no log multiple quick reads as independent
- # * save last token read so we aren't limited by the rate limit
- # * `t_prv` and `t_now` for rate limiting caluclations
- #
- self.RATE_LIMIT_MS = 1500.0
- self.LAST_TOK = ""
- self.t_prv = time.time()*1000.0
- self.t_now = self.t_prv
- # kick off our main loop
- #
- self.update_loop()
- def log_barcode(self, data):
- with open( self.barcode_ofn, "a" ) as ofp:
- ofp.write( data + "\n")
- ofp.flush()
- # Look for new messages in the `message` directory
- # (stored in `self.message_dir`) for new messages.
- # Once read, move to the `self.processed_message_dir`
- # directory.
- #
- def message_scan(self):
- msg_fns = []
- for rootdir, dirs, fns in os.walk( self.message_dir ):
- for fn in fns:
- msg_fns.append(fn)
- msg_fns.sort()
- for msg_fn in msg_fns:
- src_fq_msg_fn = os.path.join(self.message_dir, msg_fn)
- dst_fq_msg_fn = os.path.join(self.processed_message_dir, msg_fn)
- with open(src_fq_msg_fn) as fp:
- msg_a = fp.read().split("\n")
- self.display_msg = msg_a[0][0:10]
- if len(msg_a) > 1:
- self.display_msg_error = msg_a[1][0:10]
- self.display_msg_start = time.time() * 1000.0
- self.display_msg_end = self.display_msg_start + self.display_msg_t
- os.replace(src_fq_msg_fn, dst_fq_msg_fn)
- def message_update(self):
- t = time.time() * 1000.0
- # Clear the current message if we've exceeded
- # the `self.display_msg_end` time.
- # Put in the idle message.
- #
- if t > self.display_msg_end:
- self.display_msg = ""
- self.display_msg_error = ""
- self.display_msg = self.display_msg_idle
- # @rite to watchdog file with current time (in seconds, since
- # epoch) at a rate of `self.watchdog_rate_ms` (in milliseconds).
- # Monitoring processes can check this file to make sure the
- # piu_app is still running.
- #
- def watchdog(self):
- t = time.time() * 1000.0
- if t > self.watchdog_next:
- with open( self.watchdog_ofn, "w" ) as ofp:
- ofp.write( "{}".format(int(t)) + "\n" )
- ofp.flush()
- self.watchdog_next = t + self.watchdog_rate_ms
- def update_loop(self):
- self.watchdog()
- # scan for PIU messages to display on-screen
- #
- self.message_scan()
- self.message_update()
- # grab our camera and resize apporpriately
- #
- img_cv2 = None
- img_cv2_o = self.pistream.read()
- img_cv2 = imutils.resize(img_cv2_o, width=self.cap_width)
- # keep oriignal image size to process barcodes, resize
- # for display and keep rescale factor to properly overla
- # the rectangle and other indicator text.
- #
- _r = float(self.cap_width) / float(img_cv2_o.shape[1])
- ## overlay barcode rectanges and text on cv2 image
- ##
- barcodes = pyzbar.decode(img_cv2_o)
- show_text = None
- for barcode in barcodes:
- # Show overlay of scanned barcode with text information
- # on what information was scanned.
- #
- (x_o,y_o,w_o,h_o) = barcode.rect
- (x,y,w,h) = ( int(float(x_o)*_r), int(float(y_o)*_r), int(float(w_o)*_r), int(float(h_o)*_r) )
- cv2.rectangle(img_cv2, (x,y), (x+w, y+h), (0, 0, 255), 2)
- barcode_data = barcode.data.decode("utf-8")
- barcode_type = barcode.type
- show_text = "{} ({})".format(barcode_data, barcode_type)
- # we flip the image (below), so we need to transform the x
- # coordinate
- #
- _w = img_cv2.shape[1]
- cam_img_txt_org = (_w - x - w, y - 10)
- # We will be sensing bar/qr codes continuiously (50ms ideally,
- # though probably slower due to Python inefficiencies), so
- # we only consider bar/qr code data if the scanned code is
- # different from our last scan _or_ if we've waited past
- # our 'RATE_LIMIT_MS' limit.
- # Codes scanned are logged to the barcode file, which another
- # process should pickup and process.
- #
- self.t_now = time.time()*1000.0
- if self.LAST_TOK != barcode_data:
- self.log_barcode( str(int(time.time())) + ": " + barcode_data )
- self.t_prv = self.t_now
- self.LAST_TOK = barcode_data
- elif (self.t_now - self.t_prv) >= self.RATE_LIMIT_MS:
- self.log_barcode( str(int(time.time())) + ": " + barcode_data )
- self.t_prv = self.t_now
- self.LAST_TOK = barcode_data
- else:
- pass
- # We need to convert from whatever native format opencv returns
- # to the format that tkinter expects.
- # We also flip the image so that the camera is much more like
- # a "mirror", for ease of media placement under/in front of the
- # camera.
- #
- self.image = cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB)
- self.image = cv2.flip(self.image, 1)
- # to get the barcode text to not be reversed, we overlay
- #the bar/qr code text here.
- #
- if show_text is not None:
- _w = self.image.shape[1]
- cv2.putText(self.image, show_text, cam_img_txt_org,
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, (235, 116, 108), 2 )
- self.image = Image.fromarray(self.image)
- self.image = ImageTk.PhotoImage(self.image)
- disp_cam_dy = int(self.screen_height - self.cap_height)
- disp_cam_dx = int((self.screen_width - self.cap_width)/2)
- disp_cam_dx -= 1
- if disp_cam_dx<0: disp_cam_dx=0
- midx = int(self.screen_width/2)
- # Do some formatting of date-time.
- # "Blink" the ':' each second.
- #
- _now = datetime.datetime.now()
- yyyymmdd = _now.strftime("%Y-%m-%d")
- time_str = _now.strftime("%a %I") + [":", " "][int(_now.strftime("%S"))%2] + _now.strftime("%M%p")
- # text_msg are the messages to be displayed.
- # The position in the array is the vertical position and 'type' of message
- #
- # 0-1: date-time
- # 2: status
- # 3-4: error
- #
- text_msg = [ yyyymmdd, time_str, self.display_msg, self.display_msg_error ]
- text_msg_dy = [ 60, 80, 80, 60 , 0 ]
- self.canvas.delete("all")
- # txt_dy is the amount to shift the text down by
- # (txt_x, txt_y) is the current (center) position of displayed text
- #
- txt_dy = 80
- txt_x = midx
- txt_y = int(txt_dy/2)
- for idx,msg in enumerate(text_msg):
- font = None
- if idx==0:
- font = tkfont.Font(family="monospace", weight="bold", size=60)
- else:
- font = tkfont.Font(family="monospace", weight="bold", size=50)
- # change color depending on position
- # top two rows are normal text (date and time)
- # third row is status message (blue)
- # fourth and fifth row are error messages
- #
- color = "#aaa"
- if idx == 2: color = "#6194fa"
- elif idx > 2: color = "#fa6171"
- self.canvas.create_text(txt_x, txt_y, anchor=tk.CENTER, text=msg, font=font, fill=color)
- txt_y += text_msg_dy[idx]
- # draw our built up tk canvas and setupt the interval callback
- #
- self.canvas.create_image(disp_cam_dx, disp_cam_dy, anchor=tk.NW, image=self.image)
- self.window.after(self.interval, self.update_loop)
- if __name__ == "__main__":
- root = tk.Tk()
- pistream = PiVideoStream().start()
- time.sleep(2.0)
- MainWindow(root, pistream)
- # catch signals so we can gracefully-ish exit on a ctrl-c
- #
- signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
- root.after(500, check)
- root.bind_all('<Control-c>', handler)
- root.configure(bg='#000')
- root.mainloop()
|