Structure the code of your import/export processes with Dataflow

Processus d'import export avec Dataflow

17 Dec 2024

It all starts with a prospect contacting us to audit their site that is suffering from performance issues. For the purposes of this article, I will ignore the pages that took 40 seconds to load, and focus on the relevant topic: data import scripts.

In short, the process consists of querying a PIM through an API that presents product data organized in a tree structure, and creating or updating the corresponding products in the application. The largest script processes barely 30,000 objects, and took 24 hours. To put things in context, this script is supposed to be executed every day.

The code of course turned out to be similar to a plate of long, thin, cylindrical pasta, a great classic of Italian cuisine. Reading and writing mixed in classes of hundreds of lines, if and switch statements galore, and copy-pasting in all directions, results in unreadable, non-extensible and unmaintainable code. Hidden in the middle of all this, the main causes of slowness: writes that are both redundant (multiple writes for the same object) and useless (the local object is already up to date).

Trying to save the existing code would have been futile, so I decided to rewrite the whole thing based on Dataflow. Let's see the different bricks that form the new code.

Note: The following code is an anonymized and simplified version of the actual code. The goal is to show the code split into different classes, not to go into the implementation details of this particular import.

Reading

First, the reader, whose role is to produce a stream of objects to be processed.

   
<pre><?php

declare(strict_types=1);

namespace App\Dataflow\ProductImport;

class Reader
{
    public function __construct(private readonly PIMClient $pimClient)
    {
    }

    // $config contains the import configuration
    public function __invoke(Config $config): iterable
    {
        yield from $this->readLevel($config->rootId);
    }

    public function readLevel(int $parentId): iterable
    {
        // the PIM API returns children of a tree node as JSON
        foreach ($this->pimClient->getChildren($parentId) as $item) {
            // PIMItem is a POPO to encapsulate data for further processing
            yield new PIMItem($parentId, $item);
            // Recursive call, as we're querying a tree
            yield from $this->readLevel($item['id']);
        }
    }
}
</pre>
   

The role of this class is simple: traverse the PIM tree and return each object for processing.

Transformation

Next, a group of transformers, which format the received data into the format expected by the writer.

   
<pre><?php

declare(strict_types=1);

namespace App\DataflowType\ProductImport;

abstract class AbstractTransformer implements TransformerInterface
{
    public function __construct(private readonly ContentService $contentService)
    {
    }

    // ContentStructure is the class expected by the writer
    public function transform(Config $config, PIMItem $item): ContentStructure|false
    {
        // $remoteId is used to match the PIM element to the local element
        $remoteId = RemoteIdMaker::makeRemoteId($config, $item->data['id']);
        try {
            $this->contentService->loadContentInfoByRemoteId($remoteId);

            return $this->makeUpdateStructure($config, $item);
        } catch (NotFoundException) {
            return $this->makeCreateStructure($config, $item);
        }
    }

    private function makeCreateStructure(Config $config, PIMItem $item): ContentCreateStructure
    {
        $remoteId = RemoteIdMaker::makeRemoteId($config, $item['id']);
        MemoryCache::addHandledRemoteId($remoteId);

        return new ContentCreateStructure(
            $this->contentTypeIdentifier(),
            $this->fields($config, $item),
            $remoteId,
        );
    }

    private function makeUpdateStructure(Config $config, PIMItem $item): ContentUpdateStructure|false
    {
        $remoteId = RemoteIdMaker::makeRemoteId($config, $item['id']);
        // Elements can be present multiple times in the PIM tree,
        // we make sure to process them only once
        if (MemoryCache::isRemoteIdHandled($remoteId)) {
            return false;
        }

        MemoryCache::addHandledRemoteId($remoteId);

        return new ContentUpdateStructure(
            $remoteId,
            $this->fields($config, $item),
        );
    }

    abstract protected function contentTypeIdentifier(): string;
    abstract protected function fields(Config $config, PIMItem $item): array;
}
</pre>
   

We have one transformer for each type of object (category, product, range), and the class above serves as a parent in which we group common functionalities.

Filtering

Then we only want to write the objects that are not up to date with respect to the PIM.

   
<pre><?php

declare(strict_types=1);

namespace App\DataflowType\ProductImport;

class NotModifiedFilter
{
    public function __construct(
        private readonly ContentService $contentService,
        private readonly FieldComparator $comparator,
    ) {
    }

    public function filterNotModified($item): mixed
    {
        // Only updates should be filtered
        if (!$item instanceof ContentUpdateStructure) {
            return $item;
        }

        // We retrieve the local object
        $content = $this->contentService->loadContentByRemoteId($item->remoteId);
        if ($this->comparator->compare($content->fields, $item->fields)) {
            // The local object and the PIM object are identical, we do not write
            return false;
        }

        // There is a difference, we continue the process
        return $item;
    }
}
</pre>
   

Writing

There remains the writer, whose role is to write the received object. No complex logic here, all the formatting has been done in the transformers.

Dataflow

We put all this together in our dataflow type.

   
<pre><?php

declare(strict_types=1);

namespace App\DataflowType\ProductImport;

use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\DelegatorWriter;

class ImportDataflowType extends AbstractDataflowType
{
    public function __construct(
        private readonly Reader $reader,
        private readonly TransformerRegistry $transformerRegistry,
        private readonly NotModifiedFilter $notModifiedFilter,
        private readonly ContentCreateWriter $contentCreateWriter,
        private readonly ContentUpdateWriter $contentUpdateWriter,
    ) {
    }

    public function getLabel(): string
    {
        return 'PIM import';
    }

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        // Initializing cache and configuration
        MemoryCache::reset();
        $config = new Config($options);

        // We use the DelegatorWriter to separate the creation and update writing
        $writer = new DelegatorWriter();
        $writer->addDelegate($this->contentCreateWriter);
        $writer->addDelegate($this->contentUpdateWriter);

        $builder
            ->setReader(($this->reader)($config))
            ->addStep(function ($item) use ($config) {
                // A register is used to call the correct transformer
                return $this->transformerRegistry->getForType($item['type'])->transform($config, $item);
            })
            ->addStep($this->notModifiedFilter)
            ->addWriter($writer)
        ;
    }
}
</pre>
   

Conclusion

Dataflow encourages breaking your code into multiple classes that each have their own responsibility, with the goal of producing clean, maintainable, and extensible code.