IoT on Your Raspberry Pi
APScheduler
The Advanced Python Scheduler (APScheduler) is a light but powerful in-process task scheduler that lets you schedule functions (or any other Python callables) to execute at times of your choosing.
I have used the APScheduler software for a number of projects in the past, such as Project Curacao [9], SunRover [10], the upcoming Project Curacao 2 project (I revisit wind power in the Caribbean with a second generation Project Curacao), and other SwitchDoc Labs projects for customers. It's a great package.
You can install it with the following two commands on your Raspberry Pi:
sudo pip install setuptools --upgrade sudo pip install apscheduler
A reading of the SunIOT software and examples show how it is used.
Installing the SunIOT Software
The source code for this project is on GitHub [11], and you can download your own copy by running:
git clone https://github.com/switchdoclabs/SunIOT.git
The SunIOT software for this column is pretty straightforward. The first part is shown in Listing 2. The first few lines import all the necessary libraries; then, the LED is mapped to D4 (GPIO 4) on the Pi2Grover board in lines 16-17, and lines 22-23 start the SI1145 I2C sensor.
Listing 2
SunIOT Initialization
01 # SunIOT - SwitchDoc Labs 02 # 03 # October 2016 04 # 05 import sys 06 import os 07 08 sys.path.append('./SDL_Pi_SI1145'); 09 10 import time 11 import RPi.GPIO as GPIO 12 13 #set up GPIO using BCM numbering 14 GPIO.setmode(GPIO.BCM) 15 16 LED = 4 17 GPIO.setup(LED, GPIO.OUT, initial=0) 18 19 from datetime import datetime 20 from apscheduler.schedulers.background import BackgroundScheduler 21 22 import SDL_Pi_SI1145 23 sensor = SDL_Pi_SI1145.SDL_Pi_SI1145()
The next section of code (Listing 3) shows the setup for three tasks to be scheduled. The first task just prints the time to the console (lines 3-4), the second task blinks the LED to show that certain events are happening (lines 11-16). By changing the timing on blinks and the number of blinks, you can indicate that a number of different events are happening. The third task is the real guts of the IoT device, where I read the SI1145 light sensors (19-21) and return the values to the mainline program (lines 30-34). I also convert the UV readings into the UV Index values here (line 22).
Listing 3
Setup Tasks
01 # setup apscheduler 02 03 def tick(): 04 print('Tick! The time is: %s' % datetime.now()) 05 06 def killLogger(): 07 scheduler.shutdown() 08 print "Scheduler Shutdown...." 09 exit() 10 11 def blinkLED(times,length): 12 for i in range(0, times): 13 GPIO.output(LED, 1) 14 time.sleep(length) 15 GPIO.output(LED, 0) 16 time.sleep(length) 17 18 def readSunLight(): 19 vis = sensor.readVisible() 20 IR = sensor.readIR() 21 UV = sensor.readUV() 22 uvIndex = UV / 100.0 23 print('SunLight Sensor read at time: %s' % datetime.now()) 24 print ' Vis: ' + str(vis) 25 print ' IR: ' + str(IR) 26 print ' UV Index: ' + str(uvIndex) 27 28 blinkLED(2,0.200) 29 30 returnValue = [] 31 returnValue.append(vis) 32 returnValue.append(IR) 33 returnValue.append(uvIndex) 34 return returnValue
Listing 4 shows the main program, which has the main loop and is where I set up APScheduler to execute the tasks (line 25). The three tasks are the tick
process, which prints the time to the console every 60 seconds (line 17), and the blinkLED
and IoT readSunLight
processes, which give a heartbeat every five seconds (line 19) and 10 seconds (line 22), respectively.
Listing 4
Main Program
01 print "-----------------" 02 print "SunIOT" 03 print "" 04 print "SwitchDoc Labs" 05 print "-----------------" 06 print "" 07 08 if __name__ == '__main__': 09 scheduler = BackgroundScheduler() 10 11 # DEBUG Mode - because the functions run in a separate thread, debugging can be difficult inside the functions. 12 # run the functions here to test them. 13 #tick() 14 #print readSunLight() 15 16 # print out the date and time to console 17 scheduler.add_job(tick, 'interval', seconds=60) 18 # blink life light 19 scheduler.add_job(blinkLED, 'interval', seconds=5, args=[1,0.250]) 20 21 # IoT Jobs are scheduled here (more coming next issue) 22 scheduler.add_job(readSunLight, 'interval', seconds=10) 23 24 # start scheduler 25 scheduler.start() 26 print "-----------------" 27 print "Scheduled Jobs" 28 print "-----------------" 29 scheduler.print_jobs() 30 print "-----------------" 31 32 print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) 33 34 try: 35 # This is here to simulate application activity (which keeps the main thread alive). 36 while True: 37 time.sleep(2) 38 except (KeyboardInterrupt, SystemExit): 39 # Not strictly necessary if daemonic mode is enabled but should be done if possible 40 scheduler.shutdown
Note that I'm not really doing any IoT work in this example yet. However, I have built a data-gathering device, so in my next column I will be hooking it up to the IoT and the world.
Buy this article as PDF
Pages: 6
(incl. VAT)