import os, gc from .httpclient import HttpClient class OTAUpdater: """ A class to update your MicroController with the latest version from a GitHub tagged release, optimized for low power usage. """ def __init__(self, github_repo, github_src_dir='', module='', main_dir='main', new_version_dir='next', secrets_file=None, headers={}): self.http_client = HttpClient(headers=headers) self.github_repo = github_repo.rstrip('/').replace('https://git.msb-co.ir/', '') print('repo',self.github_repo) self.github_src_dir = '' if len(github_src_dir) < 1 else github_src_dir.rstrip('/') + '/' self.module = module.rstrip('/') self.main_dir = main_dir self.new_version_dir = new_version_dir self.secrets_file = secrets_file def __del__(self): self.http_client = None def check_for_update_to_install_during_next_reboot(self) -> bool: """Function which will check the GitHub repo if there is a newer version available. This method expects an active internet connection and will compare the current version with the latest version available on GitHub. If a newer version is available, the file 'next/.version' will be created and you need to call machine.reset(). A reset is needed as the installation process takes up a lot of memory (mostly due to the http stack) Returns ------- bool: true if a new version is available, false otherwise """ (current_version, latest_version) = self._check_for_new_version() if latest_version > current_version: print('New version available, will download and install on next reboot') self._create_new_version_file(latest_version) return True return False def install_update_if_available_after_boot(self, ssid, password) -> bool: """This method will install the latest version if out-of-date after boot. This method, which should be called first thing after booting, will check if the next/.version' file exists. - If yes, it initializes the WIFI connection, downloads the latest version and installs it - If no, the WIFI connection is not initialized as no new known version is available """ if self.new_version_dir in os.listdir(self.module): if '.version' in os.listdir(self.modulepath(self.new_version_dir)): latest_version = self.get_version(self.modulepath(self.new_version_dir), '.version') print('New update found: ', latest_version) OTAUpdater._using_network(ssid, password) self.install_update_if_available() return True print('No new updates found...') return False def install_update_if_available(self) -> bool: """This method will immediately install the latest version if out-of-date. This method expects an active internet connection and allows you to decide yourself if you want to install the latest version. It is necessary to run it directly after boot (for memory reasons) and you need to restart the microcontroller if a new version is found. Returns ------- bool: true if a new version is available, false otherwise """ (current_version, latest_version) = self._check_for_new_version() if latest_version > current_version: print('Updating to version {}...'.format(latest_version)) self._create_new_version_file(latest_version) self._download_new_version(latest_version) self._copy_secrets_file() self._delete_old_version() self._install_new_version() return True return False @staticmethod def _using_network(ssid, password): import network sta_if = network.WLAN(network.STA_IF) if not sta_if.isconnected(): print('connecting to network...') sta_if.active(True) sta_if.connect(ssid, password) while not sta_if.isconnected(): pass print('network config:', sta_if.ifconfig()) def _check_for_new_version(self): current_version = self.get_version(self.modulepath(self.main_dir)) latest_version = self.get_latest_version() print('Checking version... ') print('\tCurrent version: ', current_version) print('\tLatest version: ', latest_version) return (current_version, latest_version) def _create_new_version_file(self, latest_version): self.mkdir(self.modulepath(self.new_version_dir)) with open(self.modulepath(self.new_version_dir + '/.version'), 'w') as versionfile: versionfile.write(latest_version) versionfile.close() def get_version(self, directory, version_file_name='.version'): if version_file_name in os.listdir(directory): with open(directory + '/' + version_file_name) as f: version = f.read() return version return '0.0' def get_latest_version(self): print('https://git.msb-co.ir/api/v1/repos/{}/releases/latest'.format(self.github_repo)) latest_release = self.http_client.get('https://git.msb-co.ir/api/v1/repos/{}/releases/'.format(self.github_repo)) gh_json = latest_release.json() print(latest_release) try: version = gh_json[0]['tag_name'] except KeyError as e: raise ValueError( "Release not found: \n", "Please ensure release as marked as 'latest', rather than pre-release \n", "github api message: \n {} \n ".format(gh_json) ) latest_release.close() return version def _download_new_version(self, version): print('Downloading version {}'.format(version)) self._download_all_files(version) print('Version {} downloaded to {}'.format(version, self.modulepath(self.new_version_dir))) def _download_all_files(self, version, sub_dir=''): url = 'https://git.msb-co.ir/api/v1/repos/{}/contents/{}{}'.format(self.github_repo, self.github_src_dir, self.main_dir, sub_dir) print(url) gc.collect() file_list = self.http_client.get(url) file_list_json = file_list.json() for file in file_list_json: path = self.modulepath(self.new_version_dir + '/' + file['path'].replace(self.main_dir + '/', '').replace(self.github_src_dir, '')) if file['type'] == 'file': gitPath = file['path'] print('\tDownloading: ', gitPath, 'to', path) self._download_file(version, file['download_url'], path) elif file['type'] == 'dir': print('Creating dir', path) self.mkdir(path) self._download_all_files(version, sub_dir + '/' + file['name']) gc.collect() file_list.close() def _download_file(self, version, gitPath, path): self.http_client.get(gitPath, saveToFile=path) def _copy_secrets_file(self): if self.secrets_file: fromPath = self.modulepath(self.main_dir + '/' + self.secrets_file) toPath = self.modulepath(self.new_version_dir + '/' + self.secrets_file) print('Copying secrets file from {} to {}'.format(fromPath, toPath)) self._copy_file(fromPath, toPath) print('Copied secrets file from {} to {}'.format(fromPath, toPath)) def _delete_old_version(self): print('Deleting old version at {} ...'.format(self.modulepath(self.main_dir))) self._rmtree(self.modulepath(self.main_dir)) print('Deleted old version at {} ...'.format(self.modulepath(self.main_dir))) def _install_new_version(self): print('Installing new version at {} ...'.format(self.modulepath(self.main_dir))) if self._os_supports_rename(): os.rename(self.modulepath(self.new_version_dir), self.modulepath(self.main_dir)) else: self._copy_directory(self.modulepath(self.new_version_dir), self.modulepath(self.main_dir)) self._rmtree(self.modulepath(self.new_version_dir)) print('Update installed, please reboot now') def _rmtree(self, directory): for entry in os.ilistdir(directory): is_dir = entry[1] == 0x4000 if is_dir: self._rmtree(directory + '/' + entry[0]) else: os.remove(directory + '/' + entry[0]) os.rmdir(directory) def _os_supports_rename(self) -> bool: self._mk_dirs('otaUpdater/osRenameTest') os.rename('otaUpdater', 'otaUpdated') result = len(os.listdir('otaUpdated')) > 0 self._rmtree('otaUpdated') return result def _copy_directory(self, fromPath, toPath): if not self._exists_dir(toPath): self._mk_dirs(toPath) for entry in os.ilistdir(fromPath): is_dir = entry[1] == 0x4000 if is_dir: self._copy_directory(fromPath + '/' + entry[0], toPath + '/' + entry[0]) else: self._copy_file(fromPath + '/' + entry[0], toPath + '/' + entry[0]) def _copy_file(self, fromPath, toPath): with open(fromPath) as fromFile: with open(toPath, 'w') as toFile: CHUNK_SIZE = 512 # bytes data = fromFile.read(CHUNK_SIZE) while data: toFile.write(data) data = fromFile.read(CHUNK_SIZE) toFile.close() fromFile.close() def _exists_dir(self, path) -> bool: try: os.listdir(path) return True except: return False def _mk_dirs(self, path:str): paths = path.split('/') pathToCreate = '' for x in paths: self.mkdir(pathToCreate + x) pathToCreate = pathToCreate + x + '/' # different micropython versions act differently when directory already exists def mkdir(self, path:str): try: os.mkdir(path) except OSError as exc: if exc.args[0] == 17: pass def modulepath(self, path): return self.module + '/' + path if self.module else path