Python bluetooth en service old school
Python bluetooth
En pleine préparation des futurs examens, me voilà entrain de simuler une balance connectée bluetooth. L'idée étant de prendre une carte beaglebone black, une clé Broadcom Corp. BCM20702A0 Bluetooth 4.0 et de mettre en place un deamon qui répond à des demandes. Simple quoi...
La base
Je suis parti sur une distribution Ubuntu Trusty armhf. La clé bluetooth est reconnue out of the box.
ubuntu@ubuntu-armhf:~$ lsusb
Bus 002 Device 002: ID 0a5c:21e8 Broadcom Corp. BCM20702A0 Bluetooth 4.0
J'ai forcé la carte bluetooth à être détectable (visible).
$ hciconfig
hci0: Type: BR/EDR Bus: USB
BD Address: XX:XX:XX:XX:XX:XX ACL MTU: 1021:8 SCO MTU: 64:1
UP RUNNING PSCAN ISCAN
RX bytes:2694 acl:36 sco:0 events:116 errors:0
TX bytes:2253 acl:38 sco:0 commands:76 errors:0
# hciconfig hci0 piscan
J'ai ainsi pu la voir depuis mon portable.
druide@druide:~$ hcitool scan
Scanning ...
XX:XX:XX:XX:XX:XX ubuntu-armhf-0
Pour être sûr que la carte soit toujours "visible", j'ai ajouté la commande au fichier /etc/rc.local.
Là dessus, j'ai installé les outils bluez.
# apt-get install bluez bluez-tools python-gobject python-dbus python-bluez
Et pour faire un script python "en mode serveur", rien de plus simple, il suffit d'aller prendre l'exemple dans le répertoire /usr/share/doc/python-bluez/examples/simple/rfcomm-server.py.
Je l'ai ensuite "daemonizé" à l'aide de la librairie du même nom daemonize. Pour ce faire, j'ai commencé par installé les paquets suivants:
# apt-get install python-pip
# pip install daemonize
Le code du serveur python est très basique. Il reprend la structure de l'exemple auquel j'ai ajouté une réponse en fonction d'une demande de profile. Il y a 3 profiles. Les réponses se font sous la forme d'un flux JSON:
{
"profil":"3",
"poids":"86.8",
"date_iso8601":"2016-02-09T13:52:45.544907"
}
[collapse collapsed]
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import logging, sys, signal, string, random, time, datetime
from daemonize import Daemonize
from bluetooth import *
def signal_handler(signal, frame):
global server_sock
server_sock.close()
print('Exit with Ctrl+C!')
sys.exit(0)
pid = "/var/run/balance.pid"
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.propagate = False
fh = logging.FileHandler("/tmp/balance.log", "w")
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
keep_fds = [fh.stream.fileno()]
def main():
logger.debug("Balance")
server_sock=BluetoothSocket( RFCOMM )
server_sock.bind(("",PORT_ANY))
server_sock.listen(1)
port = server_sock.getsockname()[1]
uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
advertise_service( server_sock, "SampleServer",
service_id = uuid,
service_classes = [ uuid, SERIAL_PORT_CLASS ],
profiles = [ SERIAL_PORT_PROFILE ],
)
while True:
logger.debug("Waiting for connection on RFCOMM channel %d" % port)
client_sock, client_info = server_sock.accept()
logger.debug("Accepted connection from ", client_info)
try:
poids = 0.0
ts = time.time()
dt = datetime.datetime.fromtimestamp(ts).isoformat()
tosend = "{\n\"profil\":\"unknown\",\n\"poids\":\"0.0\",\n\"date_iso8601\":\"none\"\n}"
data = client_sock.recv(1024)
logger.debug("received [%s]" % data)
if ":" in data:
cmds = string.split(data, ":")
if cmds[0] == "profil":
if cmds[1] == "1":
poids = random.uniform(65.0, 67.0)
if cmds[1] == "2":
poids = random.uniform(60.0, 63.0)
if cmds[1] == "3":
poids = random.uniform(85.0, 90.0)
ts = time.time()
dt = datetime.datetime.fromtimestamp(ts).isoformat()
tosend = "{\n\"profil\":\"%s\",\n\"poids\":\"%0.1f\",\n\"date_iso8601\":\"%s\"\n}" % (cmds[1],poids,dt)
client_sock.send(tosend)
else:
client_sock.send(tosend)
else:
client_sock.send(tosend)
except IOError:
pass
logger.debug("disconnected")
client_sock.close()
logger.debug("all done")
#signal.signal(signal.SIGINT, signal_handler)
daemon = Daemonize(app="balance_app", pid=pid, action=main, keep_fds=keep_fds)
daemon.start()
[/collapse]
Old School
La version old school repose sur un script de démarrage que l'on place dans /etc/init.d. Je nomme le fichier balance.sh. Je me suis basé sur un skeleton trouvé sur le web. Je l'ai adapté à mes besoins et je l'ai également chmod +x. Reste à créer les liens pour les différents runlevels.
# update-rc.d balance.sh defaults
Adding system startup for /etc/init.d/balance.sh ...
/etc/rc0.d/K20balance.sh -> ../init.d/balance.sh
/etc/rc1.d/K20balance.sh -> ../init.d/balance.sh
/etc/rc6.d/K20balance.sh -> ../init.d/balance.sh
/etc/rc2.d/S20balance.sh -> ../init.d/balance.sh
/etc/rc3.d/S20balance.sh -> ../init.d/balance.sh
/etc/rc4.d/S20balance.sh -> ../init.d/balance.sh
/etc/rc5.d/S20balance.sh -> ../init.d/balance.sh
Et voilà, maintenant, il démarre automatiquement dans le runlevel courant
$ runlevel
N 2
$ cat /var/run/balance.pid
669
$ ps -A | grep 669
669 ? 00:00:00 python
Le test
Depuis mon portable, je crée un script python simple, permettant de tester ma connexion et mon serveur.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import bluetooth
from random import randint
bd_addr = "XX:XX:XX:XX:XX:XX"
port = 1
try:
sock=bluetooth.BluetoothSocket( bluetooth.RFCOMM )
sock.connect((bd_addr, port))
rnd = randint(1,3)
toSend = "profil:%d" % rnd
sock.send(toSend)
data = sock.recv(1024)
print "received [%s]" % data
sock.close()
except IOError, e:
print(e)
Ce qui donne:
druide@druide:~$ python client.py
received [{
"profil":"3",
"poids":"88.7",
"date_iso8601":"2016-02-09T14:38:30.239851"
}]