Implementing an pipeline step for an MNIST Dataset Loader


#1

Hello,

I’m trying to implement a pipeline step for an MNIST Dataset Loader, using the approach in the Kaggle Open Solution Data Science Bowl 2018 repository.

The current state of its implementation can be found here:

My pipeline looks something like this.

def lenet5(config, train_mode):
if train_mode:
    save_output = False
    load_saved_output = False
else:
    save_output = False
    load_saved_output = False

#TODO: Implement loader.
loader = Step(name='loader',
              transformer=MNISTLoader(**config.loader),
              input_data=['input'],
              adapter={'X': ([('input', 'X')]),
                       'y': ([('input', 'y')]),
                       'train_mode': ([('input', 'train_mode')]),
                       'X_valid': ([('input', 'X_valid')]),
                       'y_valid': ([('input', 'y_valid')]),
                        },
              cache_dirpath=config.env.cache_dirpath)

lenet5 = Step(name='lenet5',
              transformer=PyTorchLeNet5(**config.lenet5),
              input_steps=[loader],
              cache_dirpath=config.env.cache_dirpath,
              save_output=save_output, load_saved_output=load_saved_output)

output = Step(name='output',
              transformer=Dummy(),
              input_steps=[lenet5],
              adapter={'y_pred': ([(lenet5.name, 'labeled_images')]),
                       },
              cache_dirpath=config.env.cache_dirpath)
return output

An existing example of loading the MNIST dataset can be found here: https://github.com/pytorch/examples/blob/master/mnist/main.py

train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
               transform=transforms.Compose([
                   transforms.ToTensor(),
                   transforms.Normalize((0.1307,), (0.3081,))
               ])),
batch_size=args.batch_size, shuffle=True, **kwargs)

test_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=False, transform=transforms.Compose([
                   transforms.ToTensor(),
                   transforms.Normalize((0.1307,), (0.3081,))
               ])),
batch_size=args.test_batch_size, shuffle=True, **kwargs)

The MNISTDataset class currently looks like this:

class MNISTDataset(Dataset):
"""`MNIST <http://yann.lecun.com/exdb/mnist/>`_ Dataset.

Based on MNIST class from torchvision/datasets/mnist.py

"""

def __init__(self, root, train_mode, transform=None, target_transform=None, download=False):
    super().__init__()
    self.root = os.path.expanduser(root)
    self.transform = transform
    self.target_transform = target_transform
    self.train_mode = train_mode  # load training set or test set

    if download:
        self.download()

    if not self._check_exists():
        raise RuntimeError('Dataset not found.' +
                           ' You can use download=True to download it')

    if self.train_mode:
        self.train_data, self.train_labels = torch.load(
            os.path.join(self.root, self.processed_folder, self.training_file))
    else:
        self.test_data, self.test_labels = torch.load(
            os.path.join(self.root, self.processed_folder, self.test_file))

def __getitem__(self, index):
    """
    Args:
        index (int): Index

    Returns:
        tuple: (image, target) where target is index of the target class.
    """
    if self.train_mode:
        img, target = self.train_data[index], self.train_labels[index]
    else:
        img, target = self.test_data[index], self.test_labels[index]

    # doing this so that it is consistent with all other datasets
    # to return a PIL Image
    img = Image.fromarray(img.numpy(), mode='L')

    if self.transform is not None:
        img = self.transform(img)

    if self.target_transform is not None:
        target = self.target_transform(target)

    return img, target

def __len__(self):
    if self.train_mode:
        return len(self.train_data)
    else:
        return len(self.test_data)

This is the current state of the MNISTLoaderBasic.

I just need some help figuring how how to implement this classes so that it is compatible to be used as a step in a pipeline.

class MNISTLoaderBasic(BaseTransformer):
def __init__(self, loader_params, dataset_params):
    super().__init__()
    self.loader_params = AttrDict(loader_params)
    self.dataset_params = AttrDict(dataset_params)

    self.dataset = None

def transform(self, X, y, X_valid=None, y_valid=None, train_mode=True):
    if train_mode and y is not None:
        flow, steps = self.get_datagen(X, y, True, self.loader_params.training)
    else:
        flow, steps = self.get_datagen(X, None, False, self.loader_params.inference)

    if X_valid is not None and y_valid is not None:
        valid_flow, valid_steps = self.get_datagen(X_valid, y_valid, False, self.loader_params.inference)
    else:
        valid_flow = None
        valid_steps = None
    return {'datagen': (flow, steps),
            'validation_datagen': (valid_flow, valid_steps)}

def get_datagen(self, X, y, train_mode, loader_params):
    if train_mode:
        dataset = self.dataset(X, y,
                               train_mode=True)
    else:
        dataset = self.dataset(X, y,
                               train_mode=False)

    datagen = DataLoader(dataset, **loader_params)
    steps = len(datagen)
    return datagen, steps


class MNISTLoader(MNISTLoaderBasic):
def __init__(self, loader_params, dataset_params):
    super().__init__(loader_params, dataset_params)
    self.dataset = MNISTDataset('data/mnist', train_mode=True,
                                transform=transforms.Compose([transforms.ToTensor(),
                                                              transforms.Normalize((0.1307,), (0.3081,))]),
                                target_transform=None,
                                download=True)

#2

At the moment, I get the following error:

  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 92, in fit_transform
    step_inputs[input_step.name] = input_step.fit_transform(data)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 92, in fit_transform
    step_inputs[input_step.name] = input_step.fit_transform(data)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 95, in fit_transform
    step_inputs = self.adapt(step_inputs)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 181, in adapt
    raw_inputs = [step_inputs[step_name][step_var] for step_name, step_var in step_mapping]
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 181, in <listcomp>
    raw_inputs = [step_inputs[step_name][step_var] for step_name, step_var in step_mapping]
KeyError: 'X'

#3

Hi @elvisdowson,

Later today I will take a closer look at your code. I will get back to your with some fix or advice.

Best,
Kamil


#4

Great, thank you!

There was no documentation on the Step class and how the adapter is supposed to work, which is why I got stuck.

I’ve understood most of it, in terms of defining new model architectures and pipelines. But I’m missing some fundamental information on how to work with the Step library and the interaction between a dataset, dataloader and how adapters works in this context.

I intentionally chose a simple example, like MNIST, to understand the Open Solution pipeline approach. After this, I will work on a more complex pipeline and try to do design space exploration and benchmark the performance of difference neural network models.


#5

Hi @elvisdowson,

This is an error in the adapter. This is simply because you do not have X or y in you MNIST loader. Hence, there is no X to operate on. In your implementation you load data directly in the MNISTLoader.

Simple fix:

From your lenet5 pipeline, go to lines 23-27 (below, adapter arg in Step class) and remove X, y, X_valid and y_valid.:

This should fix your error.

Let me know if it worked!

Best,
Kamil


#6

Hey @elvisdowson,

Just to let you know we are open sourcing steps as a separate library. The goal is to build lightweight library that enables your to build and control ml pipelines easily.

Check these notebook that are introducing steps:

Are are also working on docstrings on this branch -> readthedocs soon! :slight_smile:

First release on June 1st.

You are welcome to provide us with your thoughts or comments.

Best,
Kamil and @jakub_czakon


#7

Should I delete the adapter altogether, or should I leave it with the train_mode key (after having removed X, y, X_valid and y_valid?

  adapter={'train_mode': ([('input', 'train_mode')]),
            },

In either case, I get the following error:

File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 242, in fit_transform
    return self.transform(*args, **kwargs)
TypeError: transform() missing 2 required positional arguments: 'X' and 'y'

One of the difficulties that I face with the Steps library is that it is difficult to debug the source of the problem, since it always reports errors within the base class, than the file where an adapter or transformer was used.

If you look at this entire exception output, it makes no reference to pipeline.py where I’m guess is the source of the error that I am now getting for the transformer.

0018-05-10 08:26:03 INFO     | base:_cached_fit_transform:108: step loader fitting and tra[s3f8o;r2mi;n2g5.5.;.1[70;m1
 4mTraceback (most recent call last):
  File "/project/software/tutorial/pytorch/pytorch-template/source/main.py", line 66, in <module>
    action()
  File "/tool/python/conda/env/gis36/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/tool/python/conda/env/gis36/lib/python3.6/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/tool/python/conda/env/gis36/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tool/python/conda/env/gis36/lib/python3.6/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tool/python/conda/env/gis36/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/project/software/tutorial/pytorch/pytorch-template/source/main.py", line 43, in train_pipeline
    _train_pipeline(pipeline_name)
  File "/project/software/tutorial/pytorch/pytorch-template/source/main.py", line 58, in _train_pipeline
    pipeline.fit_transform(data)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 92, in fit_transform
    step_inputs[input_step.name] = input_step.fit_transform(data)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 92, in fit_transform
    step_inputs[input_step.name] = input_step.fit_transform(data)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 98, in fit_transform
    step_output_data = self._cached_fit_transform(step_inputs)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 109, in _cached_fit_transform
    step_output_data = self.transformer.fit_transform(**step_inputs)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 242, in fit_transform
    return self.transform(*args, **kwargs)
TypeError: transform() missing 2 required positional arguments: 'X' and 'y'

#8

I missed the fact that the steps library had content because the master branch did not have any content beyond markdown files.

I have one comment about licensing. It would be better to use either an MIT or BSD license for the Steps library to help facilitate wide-spread adoption. If the concern is contribution back to the library, people would still contribute back with an MIT license. Having a GPL v3 license just complicates things and as a general rule tend to avoid the mixing GPL v3 licenses with permissive licenses. Most machine learning libraries and frameworks have permissive licenses. I’ve rarely come across one that is GPL based.

Additionally, some of the existing minerva-ml open solutions, DSB 2018 for example, contains a version of the steps library released under an MIT license. I’ve based the pytorch-template in my repo off the DSB 2018 code base. This causes some confusion as to the applicable license terms for the steps library contained in the DSB 2018 repo, since you have it existing in some form in that repo under an MIT license.

If the terms of the license for Steps is to be GPL v3, then I plan on not using the Steps library in my pytorch-template and use the Stanford CS230 code base instead.


#9

Hi @elvisdowson,

Let me answer one-by-one.

1: debugging Steps

I agree. We are working on that. We will add more assertions with user-friendly information. We are also discussing how to simplify adapters in general, so that it will be easier to use. Moreover, we ale planning to add more info to the logger, so that developer will have better understanding of the process.

Do you have any other ideas on how to improve it?

2: Steps dev branch
Steps are now under development under the dev branch. First release as pip installable module on June 1st.

3 License
This is fair point that you have raised. I agree with you. Our goal is to maximize adoption of our software, so we do not want to make any additional obstacles. I already have changed LICENSE to the MIT license.

4 Error: positional args X and y
I want to reproduce your error, so please let me know how you start your experiment -> exact command and codebase.

Best,
Kamil


#10

Great!

The code base is the current master branch of https://github.com/edowson/pytorch-template.git

The command to run it is:

python3.6 main.py -- train_pipeline --pipeline_name lenet5

The environment.yaml contains several packages, including ones optimized from the intel conda channel. The following commands should setup a basic environment, and you’d have to add the missing packages:

conda install anaconda
conda install pytorch torchvision cuda91 -c pytorch
conda install colorlog attrdict keras click
pip install neptune-cli

The current error as of commit id 2fbee981c38174183b903616c52a090b78104cc0 is:

  File "/project/software/tutorial/pytorch/pytorch-template/source/step/base.py", line 242, in fit_transform
    return self.transform(*args, **kwargs)
TypeError: transform() missing 2 required positional arguments: 'X' and 'y'

#11

I’ve attached a patch file against this commit, which you can apply locally to see the changes I’ve made to get it moving a bit further along.

diff --git a/source/main.py b/source/main.py
index 3380804..90a0040 100644
--- a/source/main.py
+++ b/source/main.py
@@ -4,8 +4,8 @@ import sys
 # append common modules to sys.path
 sys.path.append(os.path.join(os.path.dirname(sys.path[0]), '.'))
 
-from multiprocessing import set_start_method
-set_start_method('spawn')
+#from multiprocessing import set_start_method
+#set_start_method('spawn')
 
 import click
 import glob
@@ -22,7 +22,8 @@ from source.pipeline.pipeline import Pipeline, PipelineConfig
 
 
 log = logger.init_logger('pt', 'experiment/output.log')
-params = Params('experiment/lenet5_model/params.yaml', ParameterFileType.YAML, ctx=None)
+ctx = neptune.Context()
+params = Params('experiment/lenet5_model/neptune.yaml', ParameterFileType.NEPTUNE, ctx=ctx)
 
 # configure pipeline
 p = Pipeline(params)
diff --git a/source/pipeline/pipeline.py b/source/pipeline/pipeline.py
index b92e593..4e0c39c 100644
--- a/source/pipeline/pipeline.py
+++ b/source/pipeline/pipeline.py
@@ -46,7 +46,6 @@ class Pipeline:
     self.params = params
 
     self.PIPELINES = {'lenet5': {'train': partial(lenet5, train_mode=True),
-                                     'inference': partial(lenet5, train_mode=False),
                                  }
                       }
 
diff --git a/source/step/loader/loader.py b/source/step/loader/loader.py
index c1a8c0b..c8f88b9 100644
--- a/source/step/loader/loader.py
+++ b/source/step/loader/loader.py
@@ -15,29 +15,16 @@ class MNISTLoaderBasic(BaseTransformer):
 
     self.dataset = None
 
-    def transform(self, X, y, X_valid=None, y_valid=None, train_mode=True):
-        if train_mode and y is not None:
-            flow, steps = self.get_datagen(X, y, True, self.loader_params.training)
-        else:
-            flow, steps = self.get_datagen(X, None, False, self.loader_params.inference)
-
-        if X_valid is not None and y_valid is not None:
-            valid_flow, valid_steps = self.get_datagen(X_valid, y_valid, False, self.loader_params.inference)
-        else:
-            valid_flow = None
-            valid_steps = None
-        return {'datagen': (flow, steps),
-                'validation_datagen': (valid_flow, valid_steps)}
-
-    def get_datagen(self, X, y, train_mode, loader_params):
+    def transform(self, train_mode=True):
     if train_mode:
-            dataset = self.dataset(X, y,
-                                   train_mode=True)
+            flow, steps = self.get_datagen(loader_params=self.loader_params.training)
     else:
-            dataset = self.dataset(X, y,
-                                   train_mode=False)
+            flow, steps = self.get_datagen(loader_params=self.loader_params.inference)
+
+        return {'datagen': (flow, steps)}
 
-        datagen = DataLoader(dataset, **loader_params)
+    def get_datagen(self, loader_params):
+        datagen = DataLoader(self.dataset, **loader_params)
     steps = len(datagen)
     return datagen, steps
 
diff --git a/source/step/pytorch/model.py b/source/step/pytorch/model.py
index 3056f06..51ecb84 100644
--- a/source/step/pytorch/model.py
+++ b/source/step/pytorch/model.py
@@ -54,22 +54,22 @@ class Model(BaseTransformer):
     else:
         self.model = self.model
 
-        self.callbacks.set_params(self, validation_datagen=validation_datagen)
-        self.callbacks.on_train_begin()
+        #self.callbacks.set_params(self, validation_datagen=validation_datagen)
+        #self.callbacks.on_train_begin()
 
     batch_gen, steps = datagen
     for epoch_id in range(self.training_config['epochs']):
-            self.callbacks.on_epoch_begin()
+            #self.callbacks.on_epoch_begin()
         for batch_id, data in enumerate(batch_gen):
-                self.callbacks.on_batch_begin()
+                #self.callbacks.on_batch_begin()
             metrics = self._fit_loop(data)
-                self.callbacks.on_batch_end(metrics=metrics)
+                #self.callbacks.on_batch_end(metrics=metrics)
             if batch_id == steps:
                 break
-            self.callbacks.on_epoch_end()
+            #self.callbacks.on_epoch_end()
         if self.callbacks.training_break():
             break
-        self.callbacks.on_train_end()
+        #self.callbacks.on_train_end()
     return self
 
 def _fit_loop(self, data):

Q01: How can I ensure that the train_mode parameter value is correctly being propagated to the MNISTLoader class?

class MNISTLoaderBasic(BaseTransformer):
    def __init__(self, loader_params, dataset_params):
        super().__init__()
        self.loader_params = AttrDict(loader_params)
        self.dataset_params = AttrDict(dataset_params)

        self.dataset = None

    def transform(self, train_mode=True):
        if train_mode:
            flow, steps = self.get_datagen(loader_params=self.loader_params.training)
        else:
            flow, steps = self.get_datagen(loader_params=self.loader_params.inference)

        return {'datagen': (flow, steps)}

    def get_datagen(self, loader_params):
        datagen = DataLoader(self.dataset, **loader_params)
        steps = len(datagen)
        return datagen, steps


class MNISTLoader(MNISTLoaderBasic):
    def __init__(self, loader_params, dataset_params):
        super().__init__(loader_params, dataset_params)
        self.dataset = MNISTDataset('data/mnist', train_mode=True,
                                    transform=transforms.Compose([transforms.ToTensor(),
                                                                  transforms.Normalize((0.1307,), (0.3081,))]),
                                    target_transform=None,
                                    download=True)

Q02: How can I assign a loss function to the model? e.g. logistic regression?

At the moment I get the following error:

  File "/project/software/tutorial/pytorch/pytorch-template/source/step/pytorch/model.py", line 65, in fit
    metrics = self._fit_loop(data)
  File "/project/software/tutorial/pytorch/pytorch-template/source/step/pytorch/model.py", line 99, in _fit_loop
    len(self.loss_function))
TypeError: object of type 'NoneType' has no len()

This is probably because I haven’t defined a loss function for the PyTorchLeNet5 class.

class PyTorchLeNet5(Model):
    def __init__(self, architecture_config, training_config, callbacks_config):
        super().__init__(architecture_config, training_config, callbacks_config)

        self.model = LeNet5(**architecture_config['model_params'])
        self.weight_regularization = weight_regularization_lenet5
        self.optimizer = optim.Adam(self.weight_regularization(self.model, **architecture_config['regularizer_params']),
                                                                           **architecture_config['optimizer_params'])

Where would I go about assigning a compatible loss function?

According to the examples, LogisticRegression should be a step, e.g.

from step.base import BaseTransformer

class LogRegTransformer(BaseTransformer):
    def __init__(self):
        self.estimator = LogisticRegression()

    def fit(self, X, y):
        self.estimator.fit(X, y)
        return self

    def transform(self, X, **kwargs):
        y_pred = self.estimator.predict(X)
        return {'y_pred': y_pred}

    def save(self, filepath):
        joblib.dump(self.estimator, filepath)

    def load(self, filepath):
        self.estimator = joblib.load(filepath)
        return self

but, I don’t understand where this step has to be assigned to, since it errors out at the Model base class.

  File "/project/software/tutorial/pytorch/pytorch-template/source/step/pytorch/model.py", line 99, in _fit_loop
    len(self.loss_function))
TypeError: object of type 'NoneType' has no len()

which is this assertion

    assert len(targets_tensors) == len(outputs_batch) == len(self.loss_function),\
        '''Number of targets, model outputs and elements of loss function must equal.
        You have n_targets={0}, n_model_outputs={1}, n_loss_function_elements={2}.
        The order of elements must also be preserved.'''.format(len(targets_tensors),
                                                                len(outputs_batch),
                                                                len(self.loss_function))

#12

Hi @kamil.kaczmarek

Could you please let me know how to do about assigning a loss function for my PyTorchLeNet5(Model) class?


#13

Hi @elvisdowson

We wanted to make loss very flexible so we could do weighing of different parts of the loss with multioutput situations. That resulted in the following API:

self.loss=[('NAME', loss_function, weight), ... ('NAME', loss_function, weight)]

For example with just one loss function you would go with:

self.loss = [('loss', loss_function, 1.0)]