Monthly Archives: August 2019

Building a Rain Predictor. Preprocessing the data.

The index to the articles in this series is found here.

I wrote the new network topology and started training it. It was taking a very long time to begin training, and I tracked that down to the generator taking a long time to prepare data batches. It seems that retrieving and operating on random elements from a numpy array is costly, as the numbers have to be converted into python entities for use. A bit like the cost of cons-ing things in Lisp, so we try to minimize that.

I decided that it would be best to preprocess the data up front, during the construction of the binary intermediate files. That way I could retrieve the data in a format that could be rapidly converted to a numpy array suitable for passing to a Keras Input layer.

I’ve spent a lot of time on this project on the preprocessor, more, probably, than I’ve spent on the neural networking code itself. That’s probably not surprising, once the network design is laid out, that part is simple, but preparing the data for use is a relatively time-consuming task. Feature selection, feature engineering, normalization, and other activities along these lines.

So, the preprocessed binary files now have payload types. In addition to the raw data and the coarse-scaled data, there’s a new payload type that contains an input vector for the neural network. This input vector is represented as a set of unsigned 8-bit values, so when the generator loads it, it simply has to convert the bytearray to a numpy array and divide the elements by 255.

These changes are now in the git tree. I’m preprocessing my files overnight, and will try training again in the morning.

Building a Rain Predictor. Coarse-scaled improvements.

The index to the articles in this series is found here.

So, with the 4×4 coarse-scaling active, the network now trains in under two hours per epoch. That’s certainly an improvement, but laying out the problem in code and trying it out has clarified some points for me, and I now think I can do much better.

Oh, to people interested in the code that I include in these articles, always check in the github archive for changes, since I won’t be going back into the postings for every bugfix.

Now, let’s look at the network in more detail. You’ll recall that my motivation over the course of these articles has been to try to reduce the number of nodes in the system. There are two reasons for this. First, a larger number of nodes will take longer to train and consume more memory, so it will be more difficult to get to a working network. Second, a large number of nodes implies a large number of free parameters in the model, and I want to reduce the risk of overfitting.

So, let’s think about the network I laid out. We’ll use the un-coarsened system. The active pixels form a disc with a radius of 240 pixels. That means we have roughly 180000 meaningful pixels per time step. Ignoring the tripwire, which is small on this scale, each pixel feeds into one module. The bullseye module is about 1250 pixels, the rest are in a ring module. The ring modules have 1000 nodes on their first layer, while the bullseye has 300. The ring module networks are shared, four input modules feed each network. This means that the first layer has a total of 45 million weights. That’s kind of ridiculously high. The modules have much smaller output node counts, 5 each, for an additional 5000 or 1500 weights, insignificant compared to the big first layer. We have about 120 inputs to the LSTM layer, still much less than a million weights through there, and the synth layer and output layers are also small.

I think I can do without those 45 million weights. While planning the coarsening code (think of max and avg pooling layers in TensorFlow), I started to wonder what the module layers are intended to do, and whether they were really necessary.

The module layers are there to do some automated feature selection before the LSTM layer. I thought they might pick up shapes like cold fronts which produce lines of rain, small spots of moderate rainfall intensity over a few dozen pixels, or diffuse uniform regions when we just have a light drizzle over huge areas. Those are the three patterns I see most often on the weather radar. The LSTM layer would then extract the through-time behaviour to figure out how these patterns were moving through the region. The synthesis layers would take this raw through-time data and figure out how to make the output values from it.

Still, 45 million weights. That’s a lot.

Recall the diagram I put up of the modules from before. The dartboard with 33 regions on it. What if I made that diagram significantly finer-grained? Maybe 400 regions to start with. Then, I skip the entire module step. Each region has only two numbers, average and maximum intensity. I feed these into the LSTM layer directly, with no lower layers. That throws away 45 million weights. Instead, we have 800 inputs, and let’s say 500 neurons in the LSTM layer. The computational burden of the LSTM layer is something I’m not ready to estimate yet, if all those 500 neurons feed back into the network, then it’s still only 650000 weights, and the cost of the time tracking math isn’t that high.

So, that’s the next thing to try. This means rewriting the data generator and the training code with its network topology. I’m going to make new files for these, rather than editing the old ones, just so that it’s easier to bring them up side by side from a git checkout.

I’m going to code this up next.

Building a Rain Predictor. Coarse-scaling the data.

The index to the articles in this series is found here.

Last time we left our network trainer, it was plugging away at 40GB of virtual space, looking to spend a couple of days per epoch for training. It never made it through the first epoch, some time overnight the memory pressure became so great that my desktop got OOMed, taking the training process with it (I hadn’t run it inside nohup).

Well, we had already decided that this run wasn’t going to work. So, I decided the next thing to try was to coarse-grain the data. This is an invasive change, so I set a git tag on the source code called “FirstVersion”, and started editing.

I updated my intermediate binary file to support multiple embedded coarsenesses. This changed the API somewhat, so several files that made use of RpBinReaders had to be modified. My coarsened files contain two data points, maximum and average for the 2×2, 3×3, or 4×4 (by default) non-overlapping coarsening stencils.

Now, my data points have always been single bytes, I didn’t want to go to floats because of the higher cost in disk space and reading (the files are internally compressed, so there’s a CPU cost with reading them). While the maximum value of a set of bytes is easily represented in a single byte, the exact average is not so simple. But, we don’t need exact averages, something close enough is fine. Our input data isn’t perfect to begin with, it’s binned into 14 levels. So, since the average of a set of non-negative numbers is necessarily between 0 and the maximum value, I reserved a second byte to hold a value such that max * VAL2 / 255 is close to the average value in the cell. Inexpensive, and clearly good enough for our purposes.

The changes have been pushed to the git tree, and I’m rerunning with a 4×4 coarsening. This is looking to be a lot faster, with only one-eighth of the number of neurons (1/16 in pixels, but each pixel is now two values). We’re expecting roughly 3 hours per epoch, and memory consumption is similarly reduced.

I’ll watch this to see how it’s running. Meanwhile, though, thinking about how the high number of neurons needed to be reduced, and the coarsening I’ve been using, has led me to thinking of a radically different solution that I’ll discuss soon. Still neural networks on coarsened pixels, but different from what I’ve been showing so far. Anyway, that’s coming up.

Building a Rain Predictor. Training the model.

The index to the articles in this series is found here.

Finally, we’re training the model. I started with the most basic model. That is, no noise introduction, no special regularization, I just wanted to get the training running first, to see how things go. It is… not quick.

Here’s the training code, rptrainer.py:

#! /usr/bin/python3

# Here we go.  Training the neural network.

import rpreddtypes
import rpgenerator
import argparse
import random

import tensorflow as tf
# from tensorflow.keras.callbacks import TensorBoard, EarlyStopping

import keras
from keras.layers import Input, Dense, Concatenate, LSTM
from keras.models import Sequential, Model

import sys
import numpy as np


    

### Main code entry point here


defBatchSize = 512
ring_module_nodes_0 = 1000
ring_module_nodes_1 = 5
bullseye_module_nodes_0 = 1000
bullseye_module_nodes_1 = 5
tripwire_module_nodes_0 = 40
tripwire_module_nodes_1 = 5
synth_layer_nodes = 200
num_outputs = 10


parser = argparse.ArgumentParser(description='Train the rain '
                                 'prediction network.')
parser.add_argument('--continue', dest='Continue',
                    action='store_true',
                    help='Whether to load a previous state and '
                    'continue training')
parser.add_argument('--pathfile', type=str, dest='pathfile',
                    required=True,
                    help='The file that maps sequence numbers to '
                    'the pathnames of the binary files.')
parser.add_argument('--training-set', type=str, dest='trainingset',
                    required=True,
                    help='The file containing the training set '
                    'to use.')
parser.add_argument('--veto-set', type=str, dest='vetoset',
                    help='If supplied, its argument is the name '
                    'of a file containing training set entries that '
                    'are to be skipped.')
parser.add_argument('--test-set', type=str, dest='testset',
                    required=True,
                    help='The test set used to detect overfitting '
                    'during training.')
parser.add_argument('--savefile', type=str, dest='savefile',
                    help='The filename at which to save the '
                    'trained network parameters.  A suffix will be '
                    'applied to the name to avoid data '
                    'incompatibility.')
parser.add_argument('--override-centre', type=list, dest='centre',
                    default=[240,239], help='Set a new location for '
                    'the pixel coordinates of the radar station')
parser.add_argument('--override-sensitive-region', type=list,
                    dest='sensitive',
                    default=[[264,204], [264,205], [265,204], [265,205]],
                    help='Set a new list of sensitive pixels')
parser.add_argument('--heavy-rain-index', type=int, dest='heavy',
                    default=3, help='Lowest index in the colour table '
                    'that indicates heavy rain, where 1 is the '
                    'lightest rain.')
parser.add_argument('--batchsize', type=int, dest='batchsize',
                    default=defBatchSize,
                    help='Override the batch size used for training.')

args = parser.parse_args()


trainGenerator = rpgenerator.RPDataGenerator(args.trainingset,
                                             args.pathfile,
                                             args.vetoset,
                                             args.centre,
                                             args.sensitive,
                                             args.heavy,
                                             args.batchsize)

validationGenerator = rpgenerator.RPDataGenerator(args.testset,
                                                  args.pathfile,
                                                  args.vetoset,
                                                  args.centre,
                                                  args.sensitive,
                                                  args.heavy,
                                                  args.batchsize)

hashval = rpreddtypes.genhash(args.centre, args.sensitive, args.heavy)
modsizes = trainGenerator.getModuleSizes()

if args.savefile:
    args.savefile = args.savefile + str(hashval)


if args.Continue:
    if not args.savefile:
        print('You asked to continue by loading a previous state, '
              'but did not supply the savefile with the previous state.')
        sys.exit(1)

    mymodel = keras.models.load_model(args.savefile)
    
else:

    inputslist = list(map(lambda x:
                          Input(batch_shape=(args.batchsize, 6, x)),
                          modsizes))

    evenmodels = []
    oddmodels = []

    for i in range(4):
        onemod = Sequential()
        onemod.add(Dense(ring_module_nodes_0, activation='relu'))
        onemod.add(Dense(ring_module_nodes_1, activation='relu'))
        evenmodels.append(onemod)
        onemod = Sequential()
        onemod.add(Dense(ring_module_nodes_0, activation='relu'))
        onemod.add(Dense(ring_module_nodes_1, activation='relu'))
        oddmodels.append(onemod)

    bullseyemodel = Sequential()
    bullseyemodel.add(Dense(bullseye_module_nodes_0, activation='relu'))
    bullseyemodel.add(Dense(bullseye_module_nodes_1, activation='relu'))

    tripwiremodel = Sequential()
    tripwiremodel.add(Dense(tripwire_module_nodes_0, activation='relu'))
    tripwiremodel.add(Dense(tripwire_module_nodes_1, activation='relu'))

    
    
        
    scanned = []
    for i in range(32):
        mtype = i % 2
        ringnum = i // 8
        if mtype == 0:
            scanned.append(evenmodels[ringnum](inputslist[i]))
        else:
            scanned.append(oddmodels[ringnum](inputslist[i]))

    scanned.append(bullseyemodel(inputslist[32]))
    scanned.append(tripwiremodel(inputslist[33]))

    aggregated = Concatenate()(scanned)

    time_layer = LSTM(6, stateful = False, activation='relu')(aggregated)

    synth_layer = Dense(synth_layer_nodes, activation='relu')(time_layer)
    output_layer = Dense(num_outputs, activation='sigmoid')(synth_layer)

    mymodel = Model(inputs=inputslist, outputs=[output_layer])

mymodel.compile(loss='binary_crossentropy', optimizer='sgd')
#                metrics=[tf.keras.metrics.FalsePositives(),
#                         tf.keras.metrics.FalseNegatives()])

ie = 0
while True:
    mymodel.fit_generator(initial_epoch = ie, generator=trainGenerator,
                          validation_data = validationGenerator,
                          shuffle=False)
    ie += 1
    if args.savefile:
        mymodel.save(args.savefile)


This is essentially the same thing I did before in the Keras test code. I have created two dense models per ring, one for the bullseye, and one more for the tripwire. Put an LSTM layer atop those for time series tracking, another dense layer above for synthesis, and an output layer. Compile, and run.

I tried to run multiprocessor, my data generator was deliberately made thread-safe to that end, but ran into an integer overflow error deep in the guts of keras, so turned it back to single processor. I tried to use metrics for FalsePositive and FalseNegative from TensorFlow in the compile operation, but this lead to an error at runtime after the first epoch was fed through the network. This is, apparently, due to some incompatibilities between the versions of TensorFlow and Keras that were installed on my Kubuntu machine. No problem, I can always write a custom Keras metric class later. For now, I commented out the offending metrics and re-ran.

As of right now, I’m 12.5 hours of CPU deep in the first epoch, and the estimated time to completion is showing another 16 hours. Oh, and I put another 40GB of swap online, my python process has a virtual address space size of 35.9GB, 10.5GB resident, and 22GB of swap in use.

So, we’re going to have to back up a bit and figure out a better plan. Two possibilities are coarse-graining the inputs, and convolving them. Coarse-graining is the simpler option at this moment, so I’m going to be doing that first.

Convolving is a bit challenging. Typically, one generates a set of a few dozen random convolutions, possibly pooling them and convolving a second time before training on the whole set. The network then picks out those convolutions that have predictive power, essentially an automated feature selection step. Trying to find convolutions that will work will wind up greatly increasing the size of the model, and we’re already pushing hard against the edge of reasonable. Note, too, that because our ring models are duplicated and rotated, the convolutions would also have to be rotated in four 90-degree quadrants so that consistent data is passed through to the training.

Right, so coarse-graining it is. I’m going to test 2×2, 3×3, and 4×4 reductions, and each generated pixel will be taken from the maximum value of the pixels in its area of coverage.

I’ll be bumping the version number of my intermediate binary file format and putting those coarse-graining options right into the files.

So, we’ve reached a model that compiles and probably trains. We’ll know in about 14 hours whether or not it trains the first epoch successfully. Now, we start to adjust and tweak.

Building a Rain Predictor. Feeding data to the training algorithm.

The index to the articles in this series is found here.

So, we’ve got the data ready to feed to the neural network for training. Our images are circles with a radius of 240 pixels. To process a single element through the network, we have to feed it 6 such images. That’s roughly 1 million pixels loaded into numpy arrays. Let’s say 4 bytes per pixel for floats. We have about 10000 training elements. That’s about 40GB of input data. Now, there’s a lot of overlap between training elements, one image is actually used by almost 6 training elements, so we could, with effort, reduce this to maybe 7 GB for the input sets.

My machine has 16GB of RAM.

There is significant memory overhead in training used by Keras internals, and every 10 minutes my downloading script gives us potentially one more training element.

So, we don’t plan to load the entire training set into memory before passing it into the system. This isn’t really a problem, Keras is set up for doing this quite easily. We need to create a data generator. Here’s mine, rpgenerator.py:

#! /usr/bin/python3

# The data generator for the rain predictor
import keras
# from tensorflow import keras
import rpreddtypes
import math
import numpy


class RPDataGenerator(keras.utils.Sequence):
    'Produces data from the training set records we\'ve built'
    def __init__(self, sequence_file, path_file, veto_file,
                 centre, sensitive_region, heavyThreshold,
                 batch_size, shuffle=False):
        self.radius1 = 20
        self.radius2 = 60
        self.radius3 = 100
        self.radius4 = 170
        self.radius5 = 240
        self.tripradius = 25
        self.sequence_file = sequence_file
        self.path_file = path_file
        self.veto_file = veto_file
        self.centre = centre
        self.sensitive_region = sensitive_region
        self.heavy = heavyThreshold
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.hash = rpreddtypes.genhash(centre, sensitive_region,
                                        heavyThreshold)
        self.pathmap = {}
        self.seqmap = {}
        self.seqlist = []
        self.modules = []

        self.buildModules()
        self.loadFromFiles()

    def loadFromFiles(self):
        with open(self.path_file, 'r') as ifile:
            for record in ifile:
                fields = record.split()
                seqno = int(fields[0])
                self.pathmap[seqno] = fields[1]

        vetolist = []
        if self.veto_file:
            with open(self.veto_file, 'r') as ifile:
                for record in ifile:
                    fields = record.split()
                    seqno = int(fields[0])
                    vetolist.append(seqno)
            
        with open(self.sequence_file, 'r') as ifile:
            for record in ifile:
                fields = record.split()
                seqno = int(fields[0])
                hashno = fields[1]
                if hashno != self.hash:
                    continue
                if seqno in vetolist:
                    continue
                self.seqmap[seqno] = list(map(int, fields[4:]))
                self.seqlist.append(seqno)

        if self.shuffle:
            shuffleSequence()

    def shuffleSequence(self):
        random.shuffle(seqlist)

    def on_epoch_end(self):
        self.shuffleSequence()

    def __len__(self):
        return 1 + ( len(self.seqlist) // self.batch_size )

    def buildModules(self):
        # For each module, we store a list of pixels.  We put the
        # centre in its module.  Then, starting at the centre, we walk
        # up to the outer radius, putting all points in a module.  We
        # then go one pixel to the right, and, starting one pixel
        # above the centre, do it again.  Continue until we've run out
        # of space to the right.  Then we rotate the pixels 90 degrees
        # into their new modules.  This guarantees no missed pixels.

        self.modules = [[] for _ in range(34)]

        # Modules are numbered clockwise from the outermost, to the
        # right of the centreline, 0.  After 7, we go inward one ring
        # and repeat, starting to the right of the centreline.  This
        # covers modules 0-31.  Module 32 is the bullseye, and Module
        # 33 is the tripwire.

        self.modules[32].append(self.centre)

        for i in range(self.centre[0], 2 * self.centre[0]):
            for j in range(self.centre[1], 0, -1):
                deltaI = i - self.centre[0]
                deltaJ = self.centre[1] - j
                distance = math.sqrt(deltaI ** 2 + deltaJ ** 2)
                if distance <= self.radius1:
                    self.modules[32].append([i, j])
                elif distance <= self.radius2:
                    if deltaJ < deltaI:
                        self.modules[24].append([i, j])
                    else:
                        self.modules[25].append([i, j])
                elif distance <= self.radius3:
                    if deltaJ < deltaI:
                        self.modules[16].append([i, j])
                    else:
                        self.modules[17].append([i, j])
                elif distance <= self.radius4:
                    if deltaJ < deltaI:
                        self.modules[8].append([i, j])
                    else:
                        self.modules[9].append([i, j])
                elif distance <= self.radius5:
                    if deltaJ < deltaI:
                        self.modules[0].append([i, j])
                    else:
                        self.modules[1].append([i, j])
                else:
                    break


        nBullseye = len(self.modules[32])
                
        # Now we've loaded one quadrant.  Copy with rotation to the
        # other three quadrants.
        for i in range(6):
            for ring in range(4):
                self.modules[ring * 8 + i + 2] = list(map(lambda p: [ p[1], -p[0]], self.modules[ring * 8 + i]))

            # Do the bullseye
            for p in range(nBullseye):
                pc = self.modules[32][i * nBullseye + p]
                self.modules[32].append([pc[1], -pc[0]])

        
        # Finally, the tripwire
        for radius in range(-self.tripradius, self.tripradius + 1):
            i1 = self.centre[0] + radius
            j1 = self.centre[1] + int(math.sqrt(self.tripradius ** 2 - radius ** 2))
            j2 = 2 * self.centre[1] - j1
            if not [i1, j1] in self.modules[33]:
                self.modules[33].append([i1, j1])
            if not [i1, j2] in self.modules[33]:
                self.modules[33].append([i1, j2])

            j1 = self.centre[1] + radius
            i1 = self.centre[0] + int(math.sqrt(self.tripradius ** 2 - radius ** 2))
            i2 = 2 * self.centre[0] - i1
            if not [i1, j1] in self.modules[33]:
                self.modules[33].append([i1, j1])
            if not [i2, j1] in self.modules[33]:
                self.modules[33].append([i2, j1])


    def getModuleSizes(self):
        return list(map(len, self.modules))

    def normalize(self, val):
        if (val < self.heavy):
            return val / 20.0
        else:
            return (5 + val) / 20.0


    def inputsFromOneFile(self, filename):
        reader = rpreddtypes.RpBinReader()
        reader.read(filename)
        sourcepixels = reader.getNumpyArray()
        rval = []
        for module in range(34):
            nPixels = len(self.modules[module])
            rval.append(numpy.empty(nPixels))
            for pixelIndex in range(nPixels):
                pixel = self.modules[module][pixelIndex]
                rval[module][pixelIndex] = self.normalize(sourcepixels[pixel[0]][pixel[1]])
                
        return rval

        
        
    def __getitem__(self, index):
        'Return one batch'
        # For each module, have to return a numpy array
        # indexed by [offsetInBatch][timestep][pixelIndex]
        # Hand these back in a list, and the y-vals in another list

        rvalX = []
        rvalY = numpy.empty([self.batch_size, 10])
        for i in range(34):
            rvalX.append(numpy.empty([self.batch_size, 6,
                                      len(self.modules[i])]))

        for oib in range(self.batch_size):
            base_seqno = self.seqlist[index * self.batch_size + oib]
            for ts in range(6):
                seqno = base_seqno + ts
                filename = self.pathmap[seqno]
                inputs = self.inputsFromOneFile(filename)
                for m in range(34):
                    rvalX[m][oib][ts] = inputs[m]

            rvalY[oib] = numpy.asarray(self.seqmap[base_seqno])

        return rvalX, rvalY

For now, I’ve got some hard-coded magic numbers, as I’ve been eager to get to the training. As we’ll see soon, this will have to change…

So, what the data generator does is to hand off data to the training system in batches. Keras loads one entire batch at once, feeds it through the network, and then adjust the weights based only on that subset of the training set. It then requests the next batch of inputs, and runs it through the modified network, once more adjusting the weights as it goes. Keras recommends using batches as large as manageable in your system memory.

One entire pass through the training set is called an epoch. We randomize the data, then run as many iterations as fit inside the training set size. In this implementation, for simplicity I’ve set it up so the remainder of the training elements are not seen in that epoch. After each epoch, the data is rerandomized, so elements skipped in one epoch are likely to be seen in the following epoch.

A function called buildModules() figures out which pixels are assigned to which modules. Then, as images come in, we can pass the data quickly to the appropriate module.

We have to normalize the data. Neural networks are happier when the data is bounded, and it’s more convenient to have these inputs range from 0 for no rain to 1 for the highest represented intensity. Note, however, that I make a distinction between heavy rain and very light rain. To help the network to separate these two cases, I’ve set up the normalization so that the gap between the highest “light rain” value and the lowest “heavy rain” value is larger than the gap between other consecutive intensities.

With this, we’re finally ready to start training the network. That will be covered in the next article.