Data sources

Data sources are the nodes of Met.3D’s data pipeline. Each source accepts a data request, produces one data item, caches the result in the memory manager, and signals completion asynchronously. Sources chain together: a processing source holds a reference to one or more upstream sources and builds on their output.

Source code: src/data/abstractdatasource.h, src/data/memorymanageddatasource.h, src/data/scheduleddatasource.h, src/data/processingwpdatasource.h.

Class hierarchy

The hierarchy adds one responsibility per layer:

MAbstractDataSource
 └── MMemoryManagedDataSource   (caching + input source registration)
      └── MScheduledDataSource  (async scheduling, usage of task graphs)
           └── MWeatherPredictionDataSource  (weather metadata interface)
                ├── MWeatherPredictionReader          (file readers)
                └── MProcessingWeatherPredictionDataSource  (filter base)
                     └── MSingleInputProcessingWeatherPredictionDataSource
                          └── MSmoothFilter, MVerticalRegridder, …

MAbstractDataSource defines the public interface: requestData(), acquireData(), and requiredKeys().

MMemoryManagedDataSource adds the memory manager, the input source registry, and the logic that builds requiredKeys() by aggregating the keys of all registered input sources (with optional prefixes, see Data requests).

MScheduledDataSource adds the task-graph-based async scheduler. Subclasses implement createTaskGraph() and produceData(); the base class handles cache lookup, thread dispatch, and completion signalling.

MWeatherPredictionDataSource adds the weather metadata discovery interface (availableVariables(), availableInitTimes(), etc.) and is the base for all data sources that produce Grid types.

MProcessingWeatherPredictionDataSource adds grid creation utilities (createAndInitializeResultGrid()) for filters that produce a grid of the same type as their input.

MSingleInputProcessingWeatherPredictionDataSource is the convenience base for single-input filters. It pre-implements all metadata delegation methods (forwarding to the input source) and enables request pass-through by default.

The request/response flow

Requesting data from a source is always asynchronous:

QFuture<void> future = source->requestData(request);

// Wait for the request to be finished, best by connecting
// to the source's dataRequestCompleted() signal
MDataLease<MStructuredGrid> grid = source->getData(request);

Internally, requestData() calls createTaskGraph() to build a dependency tree of MTask objects, submits the task graph to the scheduler, and returns immediately. The scheduler executes tasks in worker threads, respecting dependencies (parents before children). When execution completes, the dataRequestCompleted(MDataRequest) signal is emitted and the QFuture is resolved.

If the result is already in the cache, requestData() recognises the task as invalid, skips scheduling entirely, and completes synchronously.

Memory management is handled by the framework: MAbstractDataSource::produceData() produces the data item and hands the returned item to the memory manager via store(), and keeps a lease alive until all child tasks have finished. See Memory management for details.

Input source registration

A source declares its upstream dependencies by calling registerInputSource(). This does two things: it merges the upstream source’s requiredKeys() into this source’s own combined key list, and it records which source to forward sub-requests to.

// Single input, no prefix (most common case):
registerInputSource(upstreamSource);

// Two inputs distinguished by prefix:
registerInputSource(sourceU, "U_");
registerInputSource(sourceV, "V_");

When a prefix is given, each of that source’s required keys is prefixed in the combined key list, so callers must include the prefixed keys in their requests. Prefixes are useful whenever a source has two inputs of the same type (for example, two file readers) and needs to route different parts of the request to each one. See Data requests for how to construct and parse prefixed requests with subRequest().

To swap an input source at runtime (e.g. when the user changes a dataset), call deregisterInputSource() followed by registerInputSource() with the new source.

Warning

Re-registering an input source while a data request is scheduled for execution can cause a race condition: the running request may have been built against the old required-keys list while the new registration has already replaced it. Only re-register input sources when you can guarantee that no requests are currently being processed by this source. See this issue for more infos.

Implementing a processing source

Derive from MSingleInputProcessingWeatherPredictionDataSource for filters that consume one upstream source. Three methods must be overridden.

1. Declare locally consumed keys

locallyRequiredKeys() returns only the keys this source itself reads from the request — not keys that will be forwarded upstream. The set must be complete: two requests that differ only in locally required keys must produce different data items, and two requests with identical locally required keys (and identical upstream keys) must always produce the same item. This is what makes the memory cache correct.

const QStringList MSmoothFilter::locallyRequiredKeys()
{
    return { "SMOOTH" };
}

2. Build the task graph

createTaskGraph() creates this source’s MTask and wires it to its parent tasks. Strip the local keys before forwarding, so the upstream source receives only its own required keys:

MTask* MSmoothFilter::createTaskGraph(MDataRequest request)
{
    MTask* task = new MTask(request, this);

    MDataRequestHelper rh(request);
    rh.removeAll(locallyRequiredKeys());
    task->addParent(inputSource->getTaskGraph(rh.request()));

    return task;
}

3. Produce the data

produceData() runs in a worker thread. Parse the request, acquire input data, create a result grid, compute, and return the heap-allocated result. The memory manager takes ownership of the returned pointer.

MStructuredGrid* MSmoothFilter::produceData(MDataRequest request)
{
    MDataRequestHelper rh(request);

    // Read locally required parameters
    int filterType = rh.intValue("SMOOTH");

    // Strip local keys and fetch input
    rh.removeAll(locallyRequiredKeys());
    MDataLease<MStructuredGrid> inputGrid = inputSource->getData(rh.request());
    // Input item was not able to be produced.
    if (!inputGrid) return createEmptyGrid();

    // Create result grid of the same type as the input
    MStructuredGrid* result = createAndInitializeResultGrid(inputGrid);

    // ... perform computation ...

    return result;  // memory manager takes ownership
}

createAndInitializeResultGrid() copies grid dimensions, coordinate arrays, and (for hybrid sigma-pressure and auxiliary pressure grids) the associated pressure fields into the new grid, so the result is a fully initialised grid of the correct type ready to be filled with values.

Wiring into the pipeline

Call setInputSource() (provided by MSingleInputProcessingWeatherPredictionDataSource) to connect the filter to its upstream source. This internally calls registerInputSource() and enables request pass-through:

smoothFilter->setMemoryManager(memoryManager);
smoothFilter->setInputSource(upstreamSource);

Weather prediction metadata interface

MWeatherPredictionDataSource exposes methods for querying what data is available, which actors use to populate their variable/time/member selectors:

QList<MVerticalLevelType> availableLevelTypes();
QStringList               availableVariables(MVerticalLevelType levelType);
QSet<unsigned int>        availableEnsembleMembers(MVerticalLevelType, const QString& variable);
QList<QDateTime>          availableInitTimes(MVerticalLevelType, const QString& variable);
QList<QDateTime>          availableValidTimes(MVerticalLevelType, const QString& variable,
                                              const QDateTime& initTime);

Processing sources derived from MSingleInputProcessingWeatherPredictionDataSource do not need to implement these. They are delegated to the input source automatically. Filters that change the available set (e.g. a regridder that changes the level type) must override the relevant methods.