Images to video (sci. vis. util)

TL;DR; Summary & Code

Use the below Python snippet to create an mp4/webm video based on images in a path that all have a prefix (which can be an empty string too of course). It requires you to have setup ffmpeg to be found on the command line (there would be ways to use the Python version too but this way you can also simply create a link to the standalone version if you are on *nix). The gist version on my github should always be up to date.

Python
# -*- coding: utf-8 -*-
"""
At some point in time

@author: elbarto
"""

def img2vid(path,
            prefix,
            moviename='auto',
            movietype='mp4',
            outrate=15,
            inrate=15,
            imgtype='auto',
            width=1280,
            height=960,
            preset='fast',
            quiet=True,
            )
    """
    Create a movie (mp4 or webm) from a series of images from a folder.

    Parameters
    ----------
    path : str
        Where the images are located.
    prefix : str
        The prefix the images have, e.g., img_XX.png
    moviename : str
        The name of the movie that should be written out e.g., img_XX.png
        The default is movie_XX.mp4 where XX is checked avoid overwriting.
    movietype : str
        The format of the movie to be created e.g., mp4 or webm
        The default is mp4, see also parameter moviename.
    outrate : int, optional
        The framerate of the input. The default is 15.
    inrate : int, optional
        The framerate of the output video. The default is 15.
    imgtype : str, optional
        The imagetype to use as input. The default is 'auto',
        which means that jpg, jpeg, png, gif are looked at and collected
    width : int, optional
        The width of the output video. The default is 1280.
    height : int, optional
        The height of the output video. The default is 960.
    preset : str, optional
        The preset for video creation, determining the creation speed.
        The default is 'fast', other options are very_fast, medium, slow...
    quiet : bool, optional
        Whether to print progress to stdout or not. The default is True.

    Returns
    -------
    None.

    """
    import os
    import subprocess

    # cheap implementation of natsort to avoid dependency
    def natsorted(listlike):
        import re
        convert = lambda x: int(x) if x.isdigit() else x.lower()
        alphanum_key = lambda key: [convert(c) 
                                    for c in re.split('([0-9]+)', key)]
        return sorted(listlike, key=alphanum_key)

    # for convenience move to the path where the images are located
    # will change at the end to the original path again
    curdir = os.path.abspath(os.curdir)
    os.chdir(path)

    filelist = []

    for entry in os.scandir(path):
        if (not entry.name.startswith('.')
           and entry.is_file()
           and entry.name.startswith(prefix)):
            pass
        else:
            continue

        if imgtype == 'auto':
            imgtypes = ['png', 'jpeg', 'jpg', 'gif']
            chk = [entry.name.lower().endswith(_) for _ in imgtypes]
            if max(chk):
                filelist.append(entry.name)
                _imgtype = [_
                            for _ in imgtypes
                            if entry.name.lower().endswith(_)]
        else:
            if entry.name.lower().endswith(imgtype):
                filelist.append(entry.name)

    filelist = natsorted(filelist)

    if imgtype == 'auto':
        if len(_imgtype) != 1:
            print('Issues with autodetection of image format.',
                  'We found the formats', _imgtype,
                  'Please pass in type directly via imgtype=...')
            return False
        imgtype = _imgtype[0]

    if filelist == []:
        print('No files found with these parameters')

    else:

        if not imgtype.startswith('.'):
            imgtype = '.' + imgtype

        cmd = "ffmpeg -r "
        cmd += f'{inrate} '
        cmd += " -f concat "

        tmpfile = 'temp_filelist.txt'
        with open(path + tmpfile, 'w') as fo:
            for file in filelist:
                fo.writelines('file ' + (file).replace('/', "\\") + '\n')

        cmd += f' -i {tmpfile}'
        cmd += ' -vcodec libx264'
        cmd += f' -preset {preset} '
        cmd += '-pix_fmt yuv420p -r '
        cmd += str(outrate)
        cmd += ' -y -s ' + f'{width}x{height} '

        # may be an issue if you have 1382195208752376502350 movie files in
        # the same folder which we hope is unlikely!
        startnumber = 0
        while os.path.exists(path+f'movie_{startnumber}.mp4'):
            startnumber += 1

        if moviename == 'auto':
            moviename = (f'movie_{startnumber}.{movietype}').replace('/', os.sep)

        cmd += moviename

        try:
            if not quiet:
                print('Calling', cmd)
            subprocess.check_call(cmd.split())
            print(f'Successfully made movie {path+os.sep + moviename}')
        except subprocess.CalledProcessError:
            print('Calling ffmpeg failed!',
                  'Make sure it is installed on your system via conda/pip/...')
        finally:
            pass
            os.chdir(curdir)
            os.remove(tmpfile)

        return path + os.sep + moviename

Background & motivation

Who doesn’t know it? You have to give a talk, illustrate your findings or simply want to show something extra on your poster at a conference with a tablet or linked via QR code. Now you can upload your image sequence to many online pages that will convert it into a format of your choice. After you made the video, you notice a mistake in the images and you have to redo it – maybe more than once even. If you want several videos, repeat the process even more often.

Instead, you could use video software suites that render the images into videos, but this is essentially the same tedious process and often requires you to learn the software (which has its own merit but maybe you are lacking the time). Why not program it instead?

The requirements are actually quite easy to meet, especially if we are using ffmpeg and Python. This requires you to have setup ffmpeg so it can be found on the command line.

Development process

To simplify the process, let’s look at the requirements for the function that were important for me at the time:

  • Call ffmpeg on the command line
  • Name the movie and do not overwrite existing movies
  • Pass in a path of images or a list of files (handy if you store the output from another script)
  • Which kind of move to make (mp4 usually is compatible the most, but webm is also useful when making videos for the web/browsers – I tend to go with mp4 for powerpoint presentations, but webm is also supported by MS Office 365 nowadays)
  • How fast the movie should play (in/outrate)
  • Which image type the images are (read jpeg, jpg, png gif are fine)
  • The dimensions the video should have (width, height), per default the first image dimension is taken
  • How the rendering by ffmpeg should be done (fast is usually good enough quality, there is a tradeoff, see documentation of ffmpeg)
  • Whether to report some progress during the making – aka the quiet option

Some things to consider are:

  • You could use natsorted, to get a natural sort of the files as that is usually how we humans would sort them. Usually, this makes little difference but natural sorting works better with mixed naming conventions (0001, 0010, 0100 … vs 1, 10, 100 …). Instead of another dependency, a cheap natsort is implemented as well. Replace the function if you actually have natsort installed and want to use it instead.
  • Instead you can also directly run the video command via ffmpeg on the command line – this is just a thin wrapper to keep some default options in place that made sense to me. You could also write files out to a file and load them via ffmpeg instead …

Other than that, the process is straightforward. Pass in your directory and the prefix that the images might have (tune some things if you want to). Otherwise, enjoy your video making and as a teaser, the following timelapse is made via the above script on a regular basis and linked here

Further reading and resources

Conversion swisstopo, CH1903 (LV95/LV03) and WGS83

TL;DR; Summary and code

Pass in either Latitude/Longitude to wgs84_to_ch1903 (which by default converts to CH1903+) or “Rechtswert” (x) and “Hochwert” (y) to ch1903_to_wgs84 (which detects if its CH1903+ based on the length/value of the passed digits. The most recent version is available as a github gist from me.

Python
import numpy as np

def deci2sexa(angle):

    angle = np.asarray(angle)
    # Extract DMS
    degrees = angle.astype(int)
    minutes = (angle-degrees*60).astype(int)
    seconds = (((angle-degrees)*60)-minutes)*60
    # Result sexagesimal seconds
    return seconds + minutes * 60.0 + degrees * 3600.0

def wgs84_to_ch1903(lat, lon, plus=True):

    lat, lon = deci2sexa(lat), deci2sexa(lon)
    # Auxiliary values (% Bern)
    lat_aux = (lat - 169028.66) / 10000
    lng_aux = (lon - 26782.5) / 10000

    x = (200147.07 +
         308807.95 * lat_aux  +
         3745.25 * np.power(lng_aux, 2) +
         76.63 * np.power(lat_aux, 2) -
         194.56 * np.power(lng_aux, 2) * lat_aux +
         119.79 * np.power(lat_aux, 3))

    y = (600072.37 +
         211455.93 * lng_aux -
         10938.51 * lng_aux * lat_aux -
         0.36 * lng_aux * np.power(lat_aux, 2) -
         44.54 * np.power(lng_aux, 3))

    if plus:
        x += 1000000
        y += 2000000

    return x, y
    
  def ch1903_to_wgs84(x, y, plus='auto'):

    if plus == 'auto':
        if np.nanmax(x) > 1200000 or np.nanmax(y) > 2600000:
            plus = True
        else:
            plus = False

    #  Auxiliary values (% Bern)
    y_aux = (y - 600000)/1000000 # would be 2200000 for ch1903plus
    x_aux = (x - 200000)/1000000 # would be 1200000 for ch1903plus

    if plus:
        x_aux -= 1 # new ch1903plus system has another digit to distinguish it
        y_aux -= 2 # new ch1903plus system has another digit to distinguish it
    # Process lat
    lat = (16.9023892 +
           3.238272 * x_aux -
           0.270978 * np.power(y_aux, 2) -
           0.002528 * np.power(x_aux, 2) -
           0.0447 * np.power(y_aux, 2) * x_aux -
           0.0140 * np.power(x_aux, 3))

    # Process lng
    lon = (2.6779094 +
           4.728982 * y_aux +
           0.791484 * y_aux * x_aux +
           0.1306 * y_aux * np.power(x_aux, 2) -
           0.0436 * np.power(y_aux, 3))

	# Unit 10000" to 1 " and converts seconds to degrees (dec)
    lon = lon * 100 / 36
    lat = lat * 100 / 36
    return lat, lon

Quite often I find myself having to convert WGS84 to CH1903 coordinate systems and vice versa. Sometimes I even got neither and simply have a center point and some distance (looking at you ground-based remote sensing data). While swisstopo used to have (in 2017 or so) a library to download, the current easiest way is to actually use the github repo from Valentin Minder which contains converter for several programming languages.

However, the repo contains classes (which are great of course) but I often prefer a direct function (which in Python is also a class, but well …). As such, I used the same formulas you can find elsewhere from swisstopo to do the calculation. Since I usually do not care that much about the altitude in these cases there is no option (as of yet) to include it. One upside of this version of the conversion is that its array enabled, i.e. the respective coordinates can be either a single scalar or a numpy array (not a list though ;-)).

Further reading

geo utils – get altitude profile data from swisstopo

Summary & code

Use the swisstopo API to get a profile of altitudes with the function get_swisstopo_elevation_profile by passing coordinates aka “Rechtswert” and “Hochwert” as array/list (any iterable should do). The gist on GitHub should always be the most up-to-date version

Python
def get_swisstopo_elevation_profile(coords,  # a path (2+ points in the form)
                                    kind='csv',
                                    # three heights are available for JSON
                                    # COMB, DTM2, DTM25
                                    which='COMB',
                                    asnumpy=True,
                                    quiet=True,
                                    opts={"sr": None,
                                          "nb_points": None,
                                          "offset": None,
                                          "distinct_points": True,
                                          "callback": None,
                                          }
                                    ):
    """
    Call the swisstopo API for altitude data along a path.

    Pass in a list or array of coordinates in EPSG 2056 (LV95) or 21781 (LV03)
    to get altitude values along the provided path. For options see keywords or
    parameters of call via swisstopo API.

    Parameters
    ----------
    coords : list or numpy array
        The coordinates in either EPSG 2056 (LV95) or EPSG 21781 (LV03).
    kind : str, optional
        Which API backend should be queried. Available are json and csv.
        If none of these are passed properly, fallback is csv.
        The default is 'csv' (easier structure to parse).
    which: str
        If kind is json, three altitude values are available, DTM2, DTM25 and
        COMB(INATION).
    asnumpy : bool, optional
        Whether to return a numpy array.
        The default is True.
    quiet : bool, optional
        Whether to quiet the output (True) or not.
        The default is True.
     opts: dict
        Further keywords than can be passed to the API call, see
        https://api3.geo.admin.ch/services/sdiservices.html#profile

    Returns
    -------
    list or numpy array
        The returned points, in the form of distance along path, altitude,
        coordinates.

    """

    import requests
    import numpy as np

    if len(coords) > 5000:
        print('Warning, number of coordinates exceeds swisstopo API'
              'max number of points, reduce coords beforehand.')
        return np.asarray([]) if asnumpy else []

    payload = 'geom={"type":"LineString","coordinates":['
    payload += ','.join([f"[{coord[0]},{coord[1]}]"
                         for coord in coords])
    payload += ']'
    opts = ','.join(['"'+str(key)+'"'+':'+'"'+str(opt)+'"'
                     for key, opt in opts.items()
                     if opt is not None])
    if opts:
        payload += ',' + opts
    payload += '}'
    kind = kind.lower()
    if kind.lower() not in ['csv', 'json']:
        if not quiet:
            print('Only csv or json can be chosen, autoselecting csv')
        kind = 'csv'

    baseurl = "https://api3.geo.admin.ch/rest/services/profile." + kind
    try:
        profile = requests.get(baseurl, params=payload)
    except ConnectionError:
        if not quiet:
            print('Connection timeout')
        return np.asarray([]) if asnumpy else []

    if profile.ok:
        if not quiet:
            print('Success')
    else:
        if not quiet:
            print('Failed')

    if kind == 'csv':

        profile = profile.text.split('\r\n')
        # Distance, Altitude, Easting, Northing -> not needed
        # header = profile[0]
        profile = list(map(lambda x: [float(j.strip('"'))
                                      for j in x.split(';')],
                           profile[1:-1]))
    elif kind == 'json':

        profile = [[p['dist'], p['alts'][which],
                    p['easting'], p['northing']]
                   for p in profile.json()]

    if asnumpy:
        profile = np.asarray(profile)

    return profile


if __name__ == '__main__':
    # straightforward example usage
    rw=[2611025.0, 2620975.0, 2633725.0]
    hw=[1266400.0, 1256750.0, 1250000.0]
    profile = get_swisstopo_elevation_profile(list(zip(rw, hw)))
    import matplotlib.pyplot as plt
    plt.fill_between(profile[:,0], 
                     profile[:,1], 
                     facecolor='grey',
                     edgecolor='k',
                     alpha=0.8)
    plt.ylim(min(profile[:,1]), None)

Background/motivation

For the CLOUDLAB project we regularly get forecasts that contain a profile at the bottom of the figure from MeteoSwiss that illustrates the terrain nicely (albeit, sometimes it is missing for unknown reasons):

Similarly, I produced a lot of figures of our scans and depending on azimuth and elevation they may be affected by ground clutter, e.g. at a distance of 2 km the increased reflectivity is caused by the terrain below (the hill at around 800 m above sea level). As you can see, I added the terrain already.

Initially, I thought I’d have to get the actual DEM for the area, find the path matching the scan direction and calculate the profile myself. While this might actually be a bit more accurate with a good DEM, it would be more work, not be transferable and mean I’d have to have extra data around. Instead, I realized that swisstopo offers the measure tool on map.geo.admin.ch and uses their own API with a matching request. So I choose to use this as I’m already adding the coordinates for the scan and it is relatively straightforward (min/max of lowest elevation scanline) to get the path we need. The main issue I faced was a misformatted quote that came from copy-pasting the swisstopo example. After finally figuring this out after a detailed URL decode and string comparison the function is relatively lean and can use either the JSON or the CSV backend of the API and by default gives you out a numpy array which can be visualized with ease and can look as follows:

I hope this helps someone to get a profile to enrich their Python (or other) graphs when measuring in Switzerland. In the likely case that you aren’t doing something in Switzerland it might be worthwhile to check out the Google Maps Elevation API (for which you need an API key and billing enabled)

Download data from the Lufft CHM 15k ceilometer webinterface

TL;DR; Summary

Download all newer data from the web interface of the Lufft CHM15k ceilometer into a main directory or directories according to Year/Month/Day format with the code below. An up-to-date version can also be found at my GitHub gists. Change according to your needs (esp. the format of the subfolders).

Python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 15 09:22:30 2021

@author: spirrobe
"""

import os
import datetime
import requests
import json


class chm15ksession(requests.Session):
    """
    A class for interacting with the CHM-15k data server.

    This class inherits from the requests.Session class and is designed to
    facilitate downloading netCDF and zipped netCDF files from the CHM-15k.
    To use this class, you must have a valid password for accessing the
    server.

    Parameters
    ----------
    url : str
        The URL of the CHM-15k ceilometer. Can be local ip or http URl
    password : str, optional
        The password for accessing the CHM-15k.
        Default is "15k-Nimbus".
    outpath : str, optional
        The path to save downloaded files to. Default is the current directory.
    download2subdirs : bool, optional
        Whether to put files into a subdirectory as outpath/{year}/{month}/{day} 
        where year, month, day are inferred for each file based on the filename
    quiet : bool, optional
        Whether to print information about the download progress.
        Default is True.

    Attributes
    ----------
    url : str
        The URL of the CHM-15k.
    session : requests.Session
        The requests session object used to communicate with the server.
    password : str
        The password for accessing the CHM-15k data server.
    outpath : str
        The path to save downloaded files to.
    filecount : bool
        The number of files available on the server.
    quiet : bool
        Whether to print information about the download progress.
    sessionid : str
        The ID of the current session with the server.
    zipfiles : list of str
        The names of the zipped netCDF files available on the server.
    zipsizes : list of int
        The sizes of the zipped netCDF files available on the server, in bytes.
    ncfiles : list of str
        The names of the netCDF files available on the server.
    ncsizes : list of int
        The sizes of the netCDF files available on the server, in bytes.

    Methods
    -------
    connect()
        Connects to the CHM-15k data server and establishes a session.
    getfilelist()
        Returns a dictionary of available netCDF and zipped netCDF files on the
        CHM-15k data server.
    getncfiles(overwrite=False)
        Downloads all available netCDF files from the CHM-15k to the
        local file system.
    getzipfiles(overwrite=False)
        Downloads all available zipped netCDF files from the CHM-15k
        to the local file system.
    """

    def __init__(self,
                 url,
                 password="15k-Nimbus",
                 outpath='./',
                 download2subdirs=False,
                 timeout=20,
                 quiet=True,
                 *args, **kwargs,
                 ):
        """
        Initialize a new instance of the chm15ksession class.

        Parameters
        ----------
        url : str
            The URL of the CHM-15k.
        password : str, optional
            The password for accessing the CHM-15k data server. Default is
            "15k-Nimbus".
        outpath : str, optional
            The path to save downloaded files to.
            Default is the current directory.
        timeout : bool, optional
            The timeout in seconds for the get calls, adjust if on low bandwidth/slow network.
        quiet : bool, optional
            Whether to print information about the download progress.
            Default is True.
        """
        super().__init__(*args, **kwargs)
        # assert url, str, 'url must be a str'
        self.timeout = timeout
        self.url = url

        if not self.url.endswith('/'):
            self.url += '/'

        if not self.url.startswith('http'):
            self.url = 'http://' + self.url

        self.__cgi = "cgi-bin/chm-cgi"
        self.__cgiurl = self.url + self.__cgi
        #self.session = requests.Session()
        #self = requests.Session()
        self.password = password
        self.outpath = outpath
        self.__subpath = ''
        self.download2subdirs = download2subdirs
        if not self.outpath.endswith(os.sep):
            self.outpath += os.sep

        self.filecount = None
        self.sessionid = None
        self.zipfiles = []
        self.zipsizes = []
        self.ncfiles = []
        self.ncsizes = []
        self.quiet = quiet

    def _filename2date(self, filename):
        # pattern is YYYYMMDD
        _ = filename.split(os.sep)[-1].split('_')[0]
        if len(_) == 8:
            # typical netcdf files
            return _[:4], _[4:4+2], _[4+2:4+2+2]
        elif len(_) == 6:
            # zipfiles do not have a day as they are for the month
            return _[:4], _[4:4+2]
        else:
            print(f'Date could not be inferred from {filename}')
            return '', '', ''

    def _filename2datefolder(self, filename):
       date = self._filename2date(filename)
       if date[0]:
           date = [s + i for s, i in zip(['Y','M','D'], date)]
           date = os.sep.join(date) + os.sep

           if not self.outpath.endswith(os.sep):
               date = os.sep + date
           return date
       else:
           return ''

    def connect(self):
        """
        Connect to the CHM-15k using the provided password.

        This method sends a validation request to the CHM-15k data server
        with the provided passwordand obtains a session ID that can be
        used for subsequent requests.

        Raises
        ------
        requests.exceptions.RequestException
            If the request fails.

        """
        validationurl = self.__cgiurl+f"?validatetoken&code={self.password}"
        # this url could be used to check if the connection worked
        # checkurl = self.__cgiurl+"?checkvalidation"
        try:
            resp = self.get(validationurl, timeout=self.timeout)
        except requests.exceptions.RequestException:
            now = datetime.datetime.now(datetime.UTC)
            print(f'{now}: Connection failed, check url {self.url} and '
                  f'password {self.password}')
            return
        sessionid = resp.text.strip().split('{')[1].split('}')[0]
        resp.close()
        sessionid = sessionid.split(':')[1].split(',')[0]
        self.sessionid = sessionid
        self.cookies.set("session", self.sessionid,
                                 domain=self.url.split(':')[1][2:])
        if not self.quiet:
            now = datetime.datetime.now(datetime.UTC)
            print(f'{now}: Connection successful to {self.url}')
        self.sessionid = True

    def getfilelist(self):
        """
        Get a list of files from the CHM-15k.

        If the connection to the server has not been established,
        this method will establish a connection. Sets attributes of the
        object to contain the return values as well.

        Returns
        -------
        dict
            A dictionary containing the following keys:
            - 'zipfiles': A list of the names of zipped netCDF files.
            - 'netcdffiles': A list of the names of netCDF files.
            - 'zipsizes': A list of the sizes of zipped netCDF files.
            - 'ncsizes': A list of the sizes of netCDF files.
        """
        if self.sessionid:
            pass
        else:
            self.connect()
        resp = self.get(self.__cgiurl + '?filelist', timeout=self.timeout)
        filelist = resp.text
        resp.close()
        filelist = filelist[filelist.index('{'):]
        filelist = filelist[:-filelist[::-1].index('}')]
        try:
            filelist = json.loads(filelist)
        except json.JSONDecodeError:
            if not self.quiet:
                now = datetime.datetime.now(datetime.UTC)
                print('{now}: Issue with getting proper filelist, aborting getfilelist and potential callers')
            return None
        self.filecount = filelist['count']
        self.zipfiles = [i[0] for i in filelist["ncfiles"] if 'zip' in i[0]]
        self.zipsizes = [i[1] for i in filelist["ncfiles"] if 'zip' in i[0]]

        self.ncfiles = [i[0] for i in filelist["ncfiles"] if 'zip' not in i[0]]
        self.ncsizes = [i[1] for i in filelist["ncfiles"] if 'zip' not in i[0]]

        if not self.quiet:
            now = datetime.datetime.now(datetime.UTC)
            print(f'{now}: Found {filelist["count"]} files in total to be checked')
            print(f'{now}: Found {len(self.ncfiles)} netCDF files')
            print(f'{now}: Found {len(self.zipfiles)} zipped netCDF files')

        return {'zipfiles': self.zipfiles, 'netcdffiles': self.ncfiles,
                'zipsizes': self.zipsizes, 'ncsizes': self.ncsizes}

    def getsinglefile(self, filename, overwrite=True):
        """
        Download a single file from the CHM15k to the specified output path.

        Parameters
        ----------
        filename : str
            Name of the file to be downloaded. Can be either zip or nc file.
        overwrite : bool, optional
            Flag indicating whether to overwrite the file if it already
            exists in the output path and has the same size.
            Defaults to True.

        Returns
        -------
        None
            If the file is not available on the server or
            if the file transfer fails.

        Raises
        ------
        None

        Notes
        -----
        This method uses the requests library to download the file
        from the server, and saves it to the output path using
        the same filename as on the device.

        """
        if self.filecount:
            pass
        else:
            self.getfilelist()

        if filename not in self.ncfiles or filename in self.zipfiles:
            print(f'File {filename} not available')
            return
        else:
            if filename in self.ncfiles:
                filesize = self.ncsizes[self.ncfiles.index(filename)]
            elif filename in self.zipfiles:
                filesize = self.zipsizes[self.zipfiles.index(filename)]
            else:
                print(f'File {filename} not available')
                return

        if self.download2subdirs:
            self.__subpath = self._filename2datefolder(filename)

        os.makedirs(self.outpath + self.__subpath, exist_ok=True)

        # check if the file exists, and if it does has the same size
        # if so continue
        if os.path.exists(self.outpath + self.__subpath + filename):
            fs = os.path.getsize(self.outpath + self.__subpath + filename) // 1024
            if fs == filesize and not overwrite:
                if not self.quiet:
                    print(f'File {filename} already exists and has the same '
                          'size as the file on the CHM15k. Pass overwrite to',
                          'download anyway')

                return

        filecontent = self.get(self.__cgiurl+'/'+filename+"?getfile", timeout=self.timeout)
        # check if the transfer worked in the firstplace, if not continue
        if filecontent.status_code != 200:
            if not self.quiet:
                now = datetime.datetime.now(datetime.UTC)
                print(f'{now}: Filetransfer failed for {filename}')
            return

        with open(self.outpath + self.__subpath + filename, 'wb') as fo:
            fo.write(filecontent.content)

        if not self.quiet:
            now = datetime.datetime.now(datetime.UTC)
            print(f'{now}: Successfully downloaded {filename}')

        self.__subpath = ''

    def getncfiles(self, overwrite=False):
        """
        Download netCDF files from the CHM-15k to the specified `outpath`.

        Parameters
        ----------
        overwrite : bool, optional
            Whether to overwrite existing files with the same name and size
            in the `outpath`.
            Default is False.

        Raises
        ------
        ValueError
            If `filecount` attribute is False.

        Notes
        -----
        This method first checks whether the `filecount` attribute is set.
        If not, it calls the `getfilelist` method to obtain a list of files
        available for download. Then, for each netCDF file in the list,
        it checks whether the file already exists in the `outpath` and has
        the same size as the file.
        If not, it downloads the file using a GET request and saves it
        to the `outpath`.

        """
        if self.filecount:
            pass
        else:
            self.getfilelist()

        dlcount = 0
        for fileno, (filename, filesize) \
                in enumerate(zip(self.ncfiles, self.ncsizes)):
            if self.download2subdirs:
                self.__subpath = self._filename2datefolder(filename)
            # check if the file exists, and if it does has the same size
            # if so continue
            if os.path.exists(self.outpath + self.__subpath + filename):
                fs = os.path.getsize(self.outpath + self.__subpath + filename) // 1024
                if fs == filesize and not overwrite:
                    if not self.quiet:
                        now = datetime.datetime.now(datetime.UTC)
                        print(f'Not downloading {filename} as it exists and has the same size')
                        print(f'{now}: Progress at ',
                             f'{round((fileno+1)/len(self.ncfiles) * 100,1)} %')

                    continue
            else:
                os.makedirs(self.outpath + self.__subpath, exist_ok=True)

            filecontent = self.get(
                self.__cgiurl+'/'+filename+"?getfile", timeout=self.timeout)
            # check if the transfer worked in the firstplace, if not continue

            if filecontent.status_code != 200:
                if not self.quiet:
                    print(f'Filetransfer failed for {filename}')
                continue

            with open(self.outpath + self.__subpath + filename, 'wb') as fo:
                fo.write(filecontent.content)

            if not self.quiet:
                now = datetime.datetime.now(datetime.UTC)
                print(f'{now}: Successfully downloaded {filename}, the {dlcount+1} file')
                print(f'{now}: Progress at '
                      f'{round((fileno+1)/len(self.ncfiles) * 100,1)} %')
            dlcount += 1
        now = datetime.datetime.now(datetime.UTC)
        print(f'{now}: Downloaded all {dlcount} files that contained new data '
              f'to {self.outpath + self.__subpath}')
        self.__subpath = ''

    def getzipfiles(self, overwrite=False):
        """
        Download zip files from the CHM-15k to the specified `outpath`.

        Parameters
        ----------
        overwrite : bool, optional
            Whether to overwrite existing files with the same name and size
            in the `outpath`.
            Default is False.

        Raises
        ------
        ValueError
            If `filecount` attribute is False.

        Notes
        -----
        This method first checks whether the `filecount` attribute is set.
        If not, it calls the `getfilelist` method to obtain a list of files
        available for download. Then, for each zip file in the list,
        it checks whether the file already exists in the `outpath` and has
        the same size as the file.
        If not, it downloads the file using a GET request and saves it
        to the `outpath`.

        """
        if self.filecount:
            pass
        else:
            self.getfilelist()

        os.makedirs(self.outpath, exist_ok=True)

        for fileno, (filename, filesize) \
                in enumerate(zip(self.zipfiles, self.zipsizes)):
            if self.download2subdirs:
                self.__subpath =  self._filename2datefolder(filename)
            # check if the file exists, and if it does has the same size
            # if so continue
            if os.path.exists(self.outpath + self.__subpath + filename):
                fs = os.path.getsize(self.outpath + self.__subpath + filename) // 1024
                if fs == filesize and not overwrite:
                    if not self.quiet:
                        print('File already exists and has '
                              f'the same size ({filename})')
                    continue
            else:
                os.makedirs(self.outpath + self.__subpath, exist_ok=True)

            filecontent = self.get(
                self.__cgiurl+'/'+filename+"?getfile", timeout=self.timeout)
            # check if the transfer worked in the firstplace, if not continue
            if filecontent.status_code != 200:
                if not self.quiet:
                    print(f'Filetransfer failed for {filename}')
                continue

            with open(self.outpath + self.__subpath + filename, 'wb') as fo:
                fo.write(filecontent.content)

            if not self.quiet:
                now = datetime.datetime.now(datetime.UTC)
                print(f'{now}: Successfully downloaded {filename}')
                print(f'{now}: Progress at '
                      f'{round((fileno+1)/len(self.zipfiles) * 100,1)} %')
        now = datetime.datetime.now(datetime.UTC)
        print(f'{now}: Downloaded all {len(self.zipfiles)} available '
              f'zip files at {self.outpath + self.__subpath}')
        self.__subpath = ''

if __name__ == '__main__':
    url = ''  # the url to connect to, either http/s or ip directly of the chm15k
    a = chm15ksession(url
                      outpath='./',
                      quiet=False)
                      
    # establish a connection, setting up a session, this wil be done automatically
    # upon calling other get functions
    a.connect()
    
    # get the available files in case you want to download only one file
    a.getfilelist()
    
    # usually, one is interested only in the netcdf files that are available,
    # especially in an operational setting where other files have already
    # been downloaded. 
    # per default, existing files are not downloaded again
    # a.getncfiles()
    
    # zipfiles are created by the device for each month and can be downloaded as well
    # per default, existing files are not downloaded again
    # a.getzipfiles()

Background & motivation

The CHM15k offers the choice between serial and ethernet connection to sample data. While serial connections are true and tested, especially with data logger the reality might be that you don’t have one on-site, its serial ports are full or you would need a USB to serial adapter (which can be quite bothersome with Linux machines. We actually do sample a Parsivel2 with our data server at the CLOUDLAB field site which requires frequent self-compiled drivers as we are running Fedora with its frequent kernel updates….

So we choose to go via the web interface of the Lufft CHM15k even though it requires a login. The upside is that checking for missing data is quite straightforward, it can be interactive and if you forward ports to its network correctly you can also sample it from the outside.

For this purpose, I had a look with the browser inspection tool to see what is being done when the password is sent and used the requests session to stay validated. The rest is fairly standard file checking and downloading. The above allows the script to be changed once with the correct URL (can be the IP or similar, including a port of course). Be aware that you should probably (really really) change the password if you make your device world-accessible via port forwarding.

Once that is done you can run the file via a cronjob or task scheduler as many times as you want as only most recent files are downloaded. Alternatively, import the class and check functionalities yourself for downloading single files or similar. Hope this helps someone out there to facilitate sampling via their ceilometer