Python bluetooth en service old school

 druide

Python bluetooth

En pleine préparation des futurs examens, me voilà entrain de simuler une balance connectée bluetooth. L'idée étant de prendre une carte beaglebone black, une clé Broadcom Corp. BCM20702A0 Bluetooth 4.0 et de mettre en place un deamon qui répond à des demandes. Simple quoi...

La base

Je suis parti sur une distribution Ubuntu Trusty armhf. La clé bluetooth est reconnue out of the box.

ubuntu@ubuntu-armhf:~$ lsusb
Bus 002 Device 002: ID 0a5c:21e8 Broadcom Corp. BCM20702A0 Bluetooth 4.0


J'ai forcé la carte bluetooth à être détectable (visible).


$ hciconfig
hci0:	Type: BR/EDR  Bus: USB
	BD Address: XX:XX:XX:XX:XX:XX  ACL MTU: 1021:8  SCO MTU: 64:1
	UP RUNNING PSCAN ISCAN 
	RX bytes:2694 acl:36 sco:0 events:116 errors:0
	TX bytes:2253 acl:38 sco:0 commands:76 errors:0

# hciconfig hci0 piscan


J'ai ainsi pu la voir depuis mon portable.


druide@druide:~$ hcitool scan
Scanning ...
	XX:XX:XX:XX:XX:XX	ubuntu-armhf-0
Pour être sûr que la carte soit toujours "visible", j'ai ajouté la commande au fichier /etc/rc.local. Là dessus, j'ai installé les outils bluez.

# apt-get install bluez bluez-tools python-gobject python-dbus python-bluez
Et pour faire un script python "en mode serveur", rien de plus simple, il suffit d'aller prendre l'exemple dans le répertoire /usr/share/doc/python-bluez/examples/simple/rfcomm-server.py. Je l'ai ensuite "daemonizé" à l'aide de la librairie du même nom daemonize. Pour ce faire, j'ai commencé par installé les paquets suivants:

# apt-get install python-pip
# pip install daemonize


Le code du serveur python est très basique. Il reprend la structure de l'exemple auquel j'ai ajouté une réponse en fonction d'une demande de profile. Il y a 3 profiles. Les réponses se font sous la forme d'un flux JSON:


{
    "profil":"3",
    "poids":"86.8",
    "date_iso8601":"2016-02-09T13:52:45.544907"
}


[collapse collapsed]


#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import logging, sys, signal, string, random, time, datetime
from daemonize import Daemonize
from bluetooth import *

def signal_handler(signal, frame):
	global server_sock
	server_sock.close()
	print('Exit with Ctrl+C!')
	sys.exit(0)


pid = "/var/run/balance.pid"
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.propagate = False
fh = logging.FileHandler("/tmp/balance.log", "w")
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
keep_fds = [fh.stream.fileno()]


def main():
	logger.debug("Balance")
	server_sock=BluetoothSocket( RFCOMM )
	server_sock.bind(("",PORT_ANY))
	server_sock.listen(1)

	port = server_sock.getsockname()[1]

	uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"

	advertise_service( server_sock, "SampleServer",
					   service_id = uuid,
					   service_classes = [ uuid, SERIAL_PORT_CLASS ],
					   profiles = [ SERIAL_PORT_PROFILE ],
						)
	while True:
		logger.debug("Waiting for connection on RFCOMM channel %d" % port)
		client_sock, client_info = server_sock.accept()
		logger.debug("Accepted connection from ", client_info)

		try:
			poids = 0.0
			ts = time.time()
			dt = datetime.datetime.fromtimestamp(ts).isoformat()
			tosend = "{\n\"profil\":\"unknown\",\n\"poids\":\"0.0\",\n\"date_iso8601\":\"none\"\n}"
			data = client_sock.recv(1024)
			logger.debug("received [%s]" % data)
			if ":" in data:
				cmds = string.split(data, ":")
				if cmds[0] == "profil":
					if cmds[1] == "1":
						poids = random.uniform(65.0, 67.0)
					if cmds[1] == "2":
						poids = random.uniform(60.0, 63.0)
					if cmds[1] == "3":
						poids = random.uniform(85.0, 90.0)
					ts = time.time()
					dt = datetime.datetime.fromtimestamp(ts).isoformat()
					tosend = "{\n\"profil\":\"%s\",\n\"poids\":\"%0.1f\",\n\"date_iso8601\":\"%s\"\n}" % (cmds[1],poids,dt)
					client_sock.send(tosend)
				else:
					client_sock.send(tosend)
			else:
				client_sock.send(tosend)
		except IOError:
			pass

		logger.debug("disconnected")
		client_sock.close()

	logger.debug("all done")
	
    
#signal.signal(signal.SIGINT, signal_handler)
daemon = Daemonize(app="balance_app", pid=pid, action=main, keep_fds=keep_fds)
daemon.start()

[/collapse]

Une fois le serveur codé, je l'ai placé dans le répertoire /usr/local/bin/balance. Un coup de chmod +x pour le rendre exécutable et il reste à faire en sorte qu'il démarre avec le système.

Old School

La version old school repose sur un script de démarrage que l'on place dans /etc/init.d. Je nomme le fichier balance.sh. Je me suis basé sur un skeleton trouvé sur le web. Je l'ai adapté à mes besoins et je l'ai également chmod +x. Reste à créer les liens pour les différents runlevels.

# update-rc.d balance.sh defaults
 Adding system startup for /etc/init.d/balance.sh ...
   /etc/rc0.d/K20balance.sh -> ../init.d/balance.sh
   /etc/rc1.d/K20balance.sh -> ../init.d/balance.sh
   /etc/rc6.d/K20balance.sh -> ../init.d/balance.sh
   /etc/rc2.d/S20balance.sh -> ../init.d/balance.sh
   /etc/rc3.d/S20balance.sh -> ../init.d/balance.sh
   /etc/rc4.d/S20balance.sh -> ../init.d/balance.sh
   /etc/rc5.d/S20balance.sh -> ../init.d/balance.sh


Et voilà, maintenant, il démarre automatiquement dans le runlevel courant


$ runlevel
N 2
$ cat /var/run/balance.pid 
669
$ ps -A | grep 669
  669 ?        00:00:00 python

Le test

Depuis mon portable, je crée un script python simple, permettant de tester ma connexion et mon serveur.


#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import bluetooth
from random import randint

bd_addr = "XX:XX:XX:XX:XX:XX"

port = 1
try:
	sock=bluetooth.BluetoothSocket( bluetooth.RFCOMM )
	sock.connect((bd_addr, port))

	rnd = randint(1,3)
	toSend = "profil:%d" % rnd
	sock.send(toSend)
	data = sock.recv(1024)
	print "received [%s]" % data
	sock.close()
except IOError, e:
	print(e)


Ce qui donne:


druide@druide:~$ python client.py 
received [{
"profil":"3",
"poids":"88.7",
"date_iso8601":"2016-02-09T14:38:30.239851"
}]

Conclusion

L'old school à du bon dans le sens où on peut rapidement mettre en place un daemon écrit en python. La carte est prête, reste à écrire le cahier des charges 😞

  • 3 years 3 months before
  • |