#! /usr/bin/python3 # =================================================================== # IoT MQTT Test Publisher # This simulates "something" controling IoT devices # =================================================================== import user_interface as ui import paho.mqtt.client as mqtt import sys broker = '192.168.1.77' topic = 'test/wifi/iot-01' # ---- menu selection, MQTT command selection = [ ('Blink LED 1','blink1'), ('Blink LED 2','blink2'), ('Blink LED 3','blink3'), ('Blink LED 4','blink4'), ('Blink Random LEDs','random'), ('Blink Cascade LEDs','cascade') ] # ------------------------------------------------------------------- # ---- main # ------------------------------------------------------------------- # ---- running Python3 if not ui.running_python3(): print() print('Must run Python3 - program exit') print() sys.exit() # ---- is the menu selection list too long? if len(selection) > 12: print() print('Error: Selection list is to long') print() sys.exit() # ---- create MQTT client and connect to broker ##print("Create client ...") client = mqtt.Client('iot_test_client_pub') ##print('Connect to broker ...') try: client.connect(broker) except Exception as e: print() print('Broker connection failed') ##if hasattr('e','message'): ## print(e.message) print() sys.exit() # ---- main menu loop l = len(selection) # number of selections # in selection list while True: ui.clear_screen() print('==================================================') print('Test My IoT devices') print(f' broker: {broker}') print(f' topic : {topic}') print('==================================================') print(' ## Description') print(' -- ------------------------------------------') # ---- display selections # ---- Note: 'i' is the selection number which is one more # ---- than the selection list index i = 1 while i <= l: print(' {} {}'.format(i,selection[i-1][0])) i += 1 # --- ask the user to make a selection print() sel = ui.get_user_input('Make selection: ') # --- validate selection if not sel: # no selection? break tf,isel = ui.is_int(sel) # get integer selection # tf - true/flag # isel - returned number if not tf: # selection is integer? print() print(f'Illegal selection entered ({sel})') ui.pause() continue idx = isel - 1 # convert selection to list index if idx < 0 or idx >= l: # selection is in range? print() print(f'Illegal selection entered ({sel})') ui.pause() continue # ---- publish command cmd = selection[idx][1] client.publish(topic,cmd) print(f'Publish {cmd}') ui.pause() # ---- exit program client.disconnect() print()