Data sources
Data sources are the nodes of Met.3D’s data pipeline. Each source accepts a data request, produces one data item, caches the result in the memory manager, and signals completion asynchronously. Sources chain together: a processing source holds a reference to one or more upstream sources and builds on their output.
Source code: src/data/abstractdatasource.h, src/data/memorymanageddatasource.h,
src/data/scheduleddatasource.h, src/data/processingwpdatasource.h.
Class hierarchy
The hierarchy adds one responsibility per layer:
MAbstractDataSource
└── MMemoryManagedDataSource (caching + input source registration)
└── MScheduledDataSource (async scheduling, usage of task graphs)
└── MWeatherPredictionDataSource (weather metadata interface)
├── MWeatherPredictionReader (file readers)
└── MProcessingWeatherPredictionDataSource (filter base)
└── MSingleInputProcessingWeatherPredictionDataSource
└── MSmoothFilter, MVerticalRegridder, …
MAbstractDataSource defines the public interface: requestData(),
acquireData(), and requiredKeys().
MMemoryManagedDataSource adds the memory manager, the input source registry,
and the logic that builds requiredKeys() by aggregating the keys of all
registered input sources (with optional prefixes, see Data requests).
MScheduledDataSource adds the task-graph-based async scheduler. Subclasses
implement createTaskGraph() and produceData(); the base class handles
cache lookup, thread dispatch, and completion signalling.
MWeatherPredictionDataSource adds the weather metadata discovery interface
(availableVariables(), availableInitTimes(), etc.) and is the base for all
data sources that produce Grid types.
MProcessingWeatherPredictionDataSource adds grid creation utilities
(createAndInitializeResultGrid()) for filters that produce a grid of the same
type as their input.
MSingleInputProcessingWeatherPredictionDataSource is the convenience base for
single-input filters. It pre-implements all metadata delegation methods (forwarding
to the input source) and enables request pass-through by default.
The request/response flow
Requesting data from a source is always asynchronous:
QFuture<void> future = source->requestData(request);
// Wait for the request to be finished, best by connecting
// to the source's dataRequestCompleted() signal
MDataLease<MStructuredGrid> grid = source->getData(request);
Internally, requestData() calls createTaskGraph() to build a dependency tree
of MTask objects, submits the task graph to the scheduler, and returns
immediately. The scheduler executes tasks in worker threads, respecting dependencies
(parents before children). When execution completes, the dataRequestCompleted(MDataRequest)
signal is emitted and the QFuture is resolved.
If the result is already in the cache, requestData() recognises the task as
invalid, skips scheduling entirely, and completes synchronously.
Memory management is handled by the framework: MAbstractDataSource::produceData()
produces the data item and hands the returned item to the memory manager via store(),
and keeps a lease alive until all child tasks have finished. See
Memory management for details.
Input source registration
A source declares its upstream dependencies by calling registerInputSource(). This
does two things: it merges the upstream source’s requiredKeys() into this source’s
own combined key list, and it records which source to forward sub-requests to.
// Single input, no prefix (most common case):
registerInputSource(upstreamSource);
// Two inputs distinguished by prefix:
registerInputSource(sourceU, "U_");
registerInputSource(sourceV, "V_");
When a prefix is given, each of that source’s required keys is prefixed in the
combined key list, so callers must include the prefixed keys in their requests.
Prefixes are useful whenever a source has two inputs of the same type (for example,
two file readers) and needs to route different parts of the request to each one.
See Data requests for how to construct and parse prefixed requests with
subRequest().
To swap an input source at runtime (e.g. when the user changes a dataset), call
deregisterInputSource() followed by registerInputSource() with the new source.
Warning
Re-registering an input source while a data request is scheduled for execution can cause a race condition: the running request may have been built against the old required-keys list while the new registration has already replaced it. Only re-register input sources when you can guarantee that no requests are currently being processed by this source. See this issue for more infos.
Implementing a processing source
Derive from MSingleInputProcessingWeatherPredictionDataSource for filters that
consume one upstream source. Three methods must be overridden.
1. Declare locally consumed keys
locallyRequiredKeys() returns only the keys this source itself reads from the
request — not keys that will be forwarded upstream. The set must be complete: two
requests that differ only in locally required keys must produce different data items,
and two requests with identical locally required keys (and identical upstream keys)
must always produce the same item. This is what makes the memory cache correct.
const QStringList MSmoothFilter::locallyRequiredKeys()
{
return { "SMOOTH" };
}
2. Build the task graph
createTaskGraph() creates this source’s MTask and wires it to its parent
tasks. Strip the local keys before forwarding, so the upstream source receives only
its own required keys:
MTask* MSmoothFilter::createTaskGraph(MDataRequest request)
{
MTask* task = new MTask(request, this);
MDataRequestHelper rh(request);
rh.removeAll(locallyRequiredKeys());
task->addParent(inputSource->getTaskGraph(rh.request()));
return task;
}
3. Produce the data
produceData() runs in a worker thread. Parse the request, acquire input data,
create a result grid, compute, and return the heap-allocated result. The memory
manager takes ownership of the returned pointer.
MStructuredGrid* MSmoothFilter::produceData(MDataRequest request)
{
MDataRequestHelper rh(request);
// Read locally required parameters
int filterType = rh.intValue("SMOOTH");
// Strip local keys and fetch input
rh.removeAll(locallyRequiredKeys());
MDataLease<MStructuredGrid> inputGrid = inputSource->getData(rh.request());
// Input item was not able to be produced.
if (!inputGrid) return createEmptyGrid();
// Create result grid of the same type as the input
MStructuredGrid* result = createAndInitializeResultGrid(inputGrid);
// ... perform computation ...
return result; // memory manager takes ownership
}
createAndInitializeResultGrid() copies grid dimensions, coordinate arrays, and
(for hybrid sigma-pressure and auxiliary pressure grids) the associated pressure
fields into the new grid, so the result is a fully initialised grid of the correct
type ready to be filled with values.
Wiring into the pipeline
Call setInputSource() (provided by MSingleInputProcessingWeatherPredictionDataSource)
to connect the filter to its upstream source. This internally calls
registerInputSource() and enables request pass-through:
smoothFilter->setMemoryManager(memoryManager);
smoothFilter->setInputSource(upstreamSource);
Weather prediction metadata interface
MWeatherPredictionDataSource exposes methods for querying what data is available,
which actors use to populate their variable/time/member selectors:
QList<MVerticalLevelType> availableLevelTypes();
QStringList availableVariables(MVerticalLevelType levelType);
QSet<unsigned int> availableEnsembleMembers(MVerticalLevelType, const QString& variable);
QList<QDateTime> availableInitTimes(MVerticalLevelType, const QString& variable);
QList<QDateTime> availableValidTimes(MVerticalLevelType, const QString& variable,
const QDateTime& initTime);
Processing sources derived from MSingleInputProcessingWeatherPredictionDataSource
do not need to implement these. They are delegated to the input source automatically.
Filters that change the available set (e.g. a regridder that changes the level type)
must override the relevant methods.