#!/usr/bin/python3
#
# Copyright (c) 2021 Clementine Computing LLC.
#
# This file is part of PopuFare.
#
# PopuFare is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PopuFare is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with PopuFare. If not, see .
#
# This is the front end interface to the PIU that will run as
# a Python application.
# It displays the video stream and overlays the barcode, if it finds one.
# It scans in the directory:
#
# /home/bus/queue
#
# for messages. If found, it then moves them to:
#
# /home/bus/log/messages
#
# Only the last message (after filename sorting) is chosen.
# The first line in the message file is the status message.
# The second line is the error message.
# Either or both lines can be blank without consequence.
#
import sys
import os
import tkinter as tk
from PIL import Image, ImageTk
import cv2
import signal
import tkinter.font as tkfont
import numpy as np
from pyzbar import pyzbar
import datetime
import imutils
import time
from imutils.video.pivideostream import PiVideoStream
from imutils.video import FPS
from picamera.array import PiRGBArray
from picamera import PiCamera
# signal handlers to capture ctrl-c etc. for "graceful" exit
#
def handler(ev):
root.destroy()
def check():
root.after(500, check)
class MainWindow():
def __init__(self, window, pistream):
# variables relating to display messages,
# how long to display them for, when to
# remove them from display and what to
# display when idle.
#
self.display_msg = ""
self.display_msg_error = ""
self.display_msg_idle = "READY"
self.display_msg_t = 10000
self.display_msg_start = 0
self.display_msg_end = 0
# log files and locations of communication files
#
self.message_dir = "/home/bus/queue"
self.processed_message_dir = "/home/bus/log/message"
self.log_dir = "/home/bus/log"
self.barcode_ofn = os.path.join(self.log_dir, "qrcode.log")
self.watchdog_ofn = os.path.join(self.log_dir, "piu_app.watchdog")
# watchdog rate limit variables
#
self.watchdog_rate_ms = 2000
self.watchdog_next = 0
# our camera stream
#
self.pistream = pistream
pad = 0
self.disp_pad = pad
self._orig_geom = "{0}x{1}+0+0".format( window.winfo_screenwidth()-pad, window.winfo_screenheight()-pad )
self.screen_width = window.winfo_screenwidth()
self.screen_height = window.winfo_screenheight()
window.attributes("-fullscreen", True)
self.cap_width = 400
self.cap_height = 300
self.window = window
# Camera processing and graphical display rate, in milliseconds.
# This is optimisitic and it looks like Python, or whatever else,
# is not going to really acheive sub 100ms.
#
self.interval = 50
self.canvas = tk.Canvas(self.window, width=self.screen_width, height=self.screen_height, bg='#000', bd=0, highlightthickness=0, relief='ridge')
self.canvas.grid(row=0, column=0)
# variables for bar/qr code scanning.
# * rate limiting to no log multiple quick reads as independent
# * save last token read so we aren't limited by the rate limit
# * `t_prv` and `t_now` for rate limiting caluclations
#
self.RATE_LIMIT_MS = 1500.0
self.LAST_TOK = ""
self.t_prv = time.time()*1000.0
self.t_now = self.t_prv
# kick off our main loop
#
self.update_loop()
def log_barcode(self, data):
with open( self.barcode_ofn, "a" ) as ofp:
ofp.write( data + "\n")
ofp.flush()
# Look for new messages in the `message` directory
# (stored in `self.message_dir`) for new messages.
# Once read, move to the `self.processed_message_dir`
# directory.
#
def message_scan(self):
msg_fns = []
for rootdir, dirs, fns in os.walk( self.message_dir ):
for fn in fns:
msg_fns.append(fn)
msg_fns.sort()
for msg_fn in msg_fns:
src_fq_msg_fn = os.path.join(self.message_dir, msg_fn)
dst_fq_msg_fn = os.path.join(self.processed_message_dir, msg_fn)
with open(src_fq_msg_fn) as fp:
msg_a = fp.read().split("\n")
self.display_msg = msg_a[0][0:10]
if len(msg_a) > 1:
self.display_msg_error = msg_a[1][0:10]
self.display_msg_start = time.time() * 1000.0
self.display_msg_end = self.display_msg_start + self.display_msg_t
os.replace(src_fq_msg_fn, dst_fq_msg_fn)
def message_update(self):
t = time.time() * 1000.0
# Clear the current message if we've exceeded
# the `self.display_msg_end` time.
# Put in the idle message.
#
if t > self.display_msg_end:
self.display_msg = ""
self.display_msg_error = ""
self.display_msg = self.display_msg_idle
# @rite to watchdog file with current time (in seconds, since
# epoch) at a rate of `self.watchdog_rate_ms` (in milliseconds).
# Monitoring processes can check this file to make sure the
# piu_app is still running.
#
def watchdog(self):
t = time.time() * 1000.0
if t > self.watchdog_next:
with open( self.watchdog_ofn, "w" ) as ofp:
ofp.write( "{}".format(int(t)) + "\n" )
ofp.flush()
self.watchdog_next = t + self.watchdog_rate_ms
def update_loop(self):
self.watchdog()
# scan for PIU messages to display on-screen
#
self.message_scan()
self.message_update()
# grab our camera and resize apporpriately
#
img_cv2 = None
img_cv2_o = self.pistream.read()
img_cv2 = imutils.resize(img_cv2_o, width=self.cap_width)
# keep oriignal image size to process barcodes, resize
# for display and keep rescale factor to properly overla
# the rectangle and other indicator text.
#
_r = float(self.cap_width) / float(img_cv2_o.shape[1])
## overlay barcode rectanges and text on cv2 image
##
barcodes = pyzbar.decode(img_cv2_o)
show_text = None
for barcode in barcodes:
# Show overlay of scanned barcode with text information
# on what information was scanned.
#
(x_o,y_o,w_o,h_o) = barcode.rect
(x,y,w,h) = ( int(float(x_o)*_r), int(float(y_o)*_r), int(float(w_o)*_r), int(float(h_o)*_r) )
cv2.rectangle(img_cv2, (x,y), (x+w, y+h), (0, 0, 255), 2)
barcode_data = barcode.data.decode("utf-8")
barcode_type = barcode.type
show_text = "{} ({})".format(barcode_data, barcode_type)
# we flip the image (below), so we need to transform the x
# coordinate
#
_w = img_cv2.shape[1]
cam_img_txt_org = (_w - x - w, y - 10)
# We will be sensing bar/qr codes continuiously (50ms ideally,
# though probably slower due to Python inefficiencies), so
# we only consider bar/qr code data if the scanned code is
# different from our last scan _or_ if we've waited past
# our 'RATE_LIMIT_MS' limit.
# Codes scanned are logged to the barcode file, which another
# process should pickup and process.
#
self.t_now = time.time()*1000.0
if self.LAST_TOK != barcode_data:
self.log_barcode( str(int(time.time())) + ": " + barcode_data )
self.t_prv = self.t_now
self.LAST_TOK = barcode_data
elif (self.t_now - self.t_prv) >= self.RATE_LIMIT_MS:
self.log_barcode( str(int(time.time())) + ": " + barcode_data )
self.t_prv = self.t_now
self.LAST_TOK = barcode_data
else:
pass
# We need to convert from whatever native format opencv returns
# to the format that tkinter expects.
# We also flip the image so that the camera is much more like
# a "mirror", for ease of media placement under/in front of the
# camera.
#
self.image = cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB)
self.image = cv2.flip(self.image, 1)
# to get the barcode text to not be reversed, we overlay
#the bar/qr code text here.
#
if show_text is not None:
_w = self.image.shape[1]
cv2.putText(self.image, show_text, cam_img_txt_org,
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (235, 116, 108), 2 )
self.image = Image.fromarray(self.image)
self.image = ImageTk.PhotoImage(self.image)
disp_cam_dy = int(self.screen_height - self.cap_height)
disp_cam_dx = int((self.screen_width - self.cap_width)/2)
disp_cam_dx -= 1
if disp_cam_dx<0: disp_cam_dx=0
midx = int(self.screen_width/2)
# Do some formatting of date-time.
# "Blink" the ':' each second.
#
_now = datetime.datetime.now()
yyyymmdd = _now.strftime("%Y-%m-%d")
time_str = _now.strftime("%a %I") + [":", " "][int(_now.strftime("%S"))%2] + _now.strftime("%M%p")
# text_msg are the messages to be displayed.
# The position in the array is the vertical position and 'type' of message
#
# 0-1: date-time
# 2: status
# 3-4: error
#
text_msg = [ yyyymmdd, time_str, self.display_msg, self.display_msg_error ]
text_msg_dy = [ 60, 80, 80, 60 , 0 ]
self.canvas.delete("all")
# txt_dy is the amount to shift the text down by
# (txt_x, txt_y) is the current (center) position of displayed text
#
txt_dy = 80
txt_x = midx
txt_y = int(txt_dy/2)
for idx,msg in enumerate(text_msg):
font = None
if idx==0:
font = tkfont.Font(family="monospace", weight="bold", size=60)
else:
font = tkfont.Font(family="monospace", weight="bold", size=50)
# change color depending on position
# top two rows are normal text (date and time)
# third row is status message (blue)
# fourth and fifth row are error messages
#
color = "#aaa"
if idx == 2: color = "#6194fa"
elif idx > 2: color = "#fa6171"
self.canvas.create_text(txt_x, txt_y, anchor=tk.CENTER, text=msg, font=font, fill=color)
txt_y += text_msg_dy[idx]
# draw our built up tk canvas and setupt the interval callback
#
self.canvas.create_image(disp_cam_dx, disp_cam_dy, anchor=tk.NW, image=self.image)
self.window.after(self.interval, self.update_loop)
if __name__ == "__main__":
root = tk.Tk()
pistream = PiVideoStream().start()
time.sleep(2.0)
MainWindow(root, pistream)
# catch signals so we can gracefully-ish exit on a ctrl-c
#
signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
root.after(500, check)
root.bind_all('', handler)
root.configure(bg='#000')
root.mainloop()