| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- #!/usr/bin/python3
- #
- # Copyright (c) 2021 Clementine Computing LLC.
- #
- # This file is part of PopuFare.
- #
- # PopuFare is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # PopuFare is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with PopuFare. If not, see <https://www.gnu.org/licenses/>.
- #
- import serial
- import re
- import math
- import time
- import sys
- import os
- from datetime import datetime
- PIU_FARED_VERSION = "0.1.0"
- cur_t = int(time.time())
- opt = {
- "verbose" : 2,
- "state_fn" : "/home/bus/config/piufared.state",
- "sleepy" : 0.015625,
- "ratelimit_mark" : 0.0,
- "ratelimit_t" : 10.0,
- "dev" : "/dev/ttyPIU",
- "baud" : 115200,
- "serial_timeout" : 0.05,
- "MAX_BUF" : 1024,
- "mag_fn" : "/home/bus/log/credential.mag",
- "rfid_fn" : "/home/bus/log/credential.rfid",
- "qr_fn" : "/home/bus/log/credential.barcode",
- "mag_ts" : cur_t,
- "rfid_ts" : cur_t,
- "qr_ts" : cur_t
- }
- def _DT():
- return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- def write_state(_opt):
- with open(opt["state_fn"], "w") as ofp:
- ofp.write("mag_ts " + str(_opt["mag_ts"]) + "\nrfid_ts " + str(_opt["rfid_ts"]) + "\nqr_ts " + str(_opt["qr_ts"]) + "\n")
- def read_state(_opt):
- with open(_opt["state_fn"]) as fp:
- lines = fp.read().split("\n")
- for line in lines:
- tok = line.split(" ")
- if len(tok) < 2: continue
- if tok[0] == "mag_ts":
- _opt["mag_ts"] = int(tok[1])
- elif tok[0] == "rfid_ts":
- _opt["rfid_ts"] = int(tok[1])
- elif tok[0] == "qr_ts":
- _opt["qr_ts"] = int(tok[1])
- def send_data(_ser, _s):
- #_s = dat + "\r\n"
- _ser.write(_s.encode("utf-8"))
- # if we don't have a state file, create a new
- # one with the default (0) values
- #
- if not os.path.isfile(opt["state_fn"]):
- write_state(opt)
- read_state(opt)
- piu_serial = serial.Serial(opt["dev"], baudrate=opt["baud"], timeout=opt["serial_timeout"])
- #_msg = '/M:;255100010014851?\r\n'
- #piu_serial.write(_msg.encode('utf-8'))
- sys.stdout.flush()
- buf = []
- while True:
- eol = False
- read_len = -1
- have_data = False
- while (len(buf) < opt["MAX_BUF"]) and (not eol) and (read_len != 0):
- b = piu_serial.read()
- read_len = len(b)
- for _b in b:
- if _b == ord('\r') or _b == ord('\n'):
- eol = True
- have_data = True
- break
- else:
- buf.append(chr(_b))
- if len(buf) >= opt["MAX_BUF"]:
- have_data = True
- if have_data:
- s = "".join(buf)
- buf = []
- if len(s)==0:
- if time.time() > opt["ratelimit_mark"]:
- ## DEBUG
- if opt["verbose"] > 0:
- print(_DT(), "[piufared]", "sending init msg '/?: ?=rider_ui\\r\\n")
- os.stdout.flush()
- write_data(piu_serial, "/?: ?=rider_ui\r\n")
- opt["ratelimit_mark"] = time.time() + opt["ratelimit_t"]
- continue
- if len(s) < 3: continue
- if s[0] != '/': continue
- if s[2] != ':': continue
- msg = s[3:]
- if s[1] == '0':
- os.system("/home/bus/bin/piumsg 'relay passenger_message " + msg + "'")
- elif s[1] == '1':
- os.system("/home/bus/bin/piumsg 'relay passenger_notify " + msg + "'")
- ###
- state_change = False
- with open(opt["mag_fn"]) as fp:
- lines = fp.read().split("\n")
- for idx in range(len(lines)-1):
- line = lines[idx]
- tok = line.split(" ")
- if len(tok)!=2: continue
- _ts = int(tok[0].split(":")[0])
- if _ts <= opt["mag_ts"]: continue
- if opt["verbose"] > 0:
- print(_DT(), "[piufared]", "sending mag", " ".join(tok[1:]))
- send_data(piu_serial, "/M:" + " ".join(tok[1:]) + "\r\n")
- opt["mag_ts"] = _ts
- state_change = True
- with open(opt["rfid_fn"]) as fp:
- lines = fp.read().split("\n")
- for idx in range(len(lines)-1):
- line = lines[idx]
- tok = line.split(" ")
- if len(tok)!=2: continue
- _ts = int(tok[0].split(":")[0])
- if _ts <= opt["rfid_ts"]: continue
- if opt["verbose"] > 0:
- print(_DT(), "[piufared]", "sending rfid", " ".join(tok[1:]))
- send_data(piu_serial, "/R:" + " ".join(tok[1:]) + "\r\n")
- opt["rfid_ts"] = _ts
- state_change = True
- with open(opt["qr_fn"]) as fp:
- lines = fp.read().split("\n")
- for idx in range(len(lines)-1):
- line = lines[idx]
- tok = line.split(" ")
- if len(tok)!=2: continue
- _ts = int(tok[0].split(":")[0])
- if _ts <= opt["qr_ts"]: continue
- if opt["verbose"] > 0:
- print(_DT(), "[piufared]", "sending qr", " ".join(tok[1:]))
- send_data(piu_serial, "/M:" + " ".join(tok[1:]) + "\r\n")
- opt["qr_ts"] = _ts
- state_change = True
- if state_change:
- if opt["verbose"] > 1:
- print(_DT(), "[piufared]", "state change, writing")
- write_state(opt)
- time.sleep(opt["sleepy"])
|