Data sources ============ Data sources are the nodes of Met.3D's data pipeline. Each source accepts a :doc:`data request `, produces one data item, caches the result in the memory manager, and signals completion asynchronously. Sources chain together: a processing source holds a reference to one or more upstream sources and builds on their output. Source code: ``src/data/abstractdatasource.h``, ``src/data/memorymanageddatasource.h``, ``src/data/scheduleddatasource.h``, ``src/data/processingwpdatasource.h``. Class hierarchy --------------- The hierarchy adds one responsibility per layer: .. code-block:: text MAbstractDataSource └── MMemoryManagedDataSource (caching + input source registration) └── MScheduledDataSource (async scheduling, usage of task graphs) └── MWeatherPredictionDataSource (weather metadata interface) ├── MWeatherPredictionReader (file readers) └── MProcessingWeatherPredictionDataSource (filter base) └── MSingleInputProcessingWeatherPredictionDataSource └── MSmoothFilter, MVerticalRegridder, … ``MAbstractDataSource`` defines the public interface: ``requestData()``, ``acquireData()``, and ``requiredKeys()``. ``MMemoryManagedDataSource`` adds the memory manager, the input source registry, and the logic that builds ``requiredKeys()`` by aggregating the keys of all registered input sources (with optional prefixes, see :doc:`data_requests`). ``MScheduledDataSource`` adds the task-graph-based async scheduler. Subclasses implement ``createTaskGraph()`` and ``produceData()``; the base class handles cache lookup, thread dispatch, and completion signalling. ``MWeatherPredictionDataSource`` adds the weather metadata discovery interface (``availableVariables()``, ``availableInitTimes()``, etc.) and is the base for all data sources that produce :doc:`grid_types`. ``MProcessingWeatherPredictionDataSource`` adds grid creation utilities (``createAndInitializeResultGrid()``) for filters that produce a grid of the same type as their input. ``MSingleInputProcessingWeatherPredictionDataSource`` is the convenience base for single-input filters. It pre-implements all metadata delegation methods (forwarding to the input source) and enables request pass-through by default. The request/response flow -------------------------- Requesting data from a source is always asynchronous: .. code-block:: c++ QFuture future = source->requestData(request); // Wait for the request to be finished, best by connecting // to the source's dataRequestCompleted() signal MDataLease grid = source->getData(request); Internally, ``requestData()`` calls ``createTaskGraph()`` to build a dependency tree of ``MTask`` objects, submits the task graph to the scheduler, and returns immediately. The scheduler executes tasks in worker threads, respecting dependencies (parents before children). When execution completes, the ``dataRequestCompleted(MDataRequest)`` signal is emitted and the ``QFuture`` is resolved. If the result is already in the cache, ``requestData()`` recognises the task as invalid, skips scheduling entirely, and completes synchronously. Memory management is handled by the framework: ``MAbstractDataSource::produceData()`` produces the data item and hands the returned item to the memory manager via ``store()``, and keeps a lease alive until all child tasks have finished. See :doc:`../01_system_modules/memory_management` for details. Input source registration -------------------------- A source declares its upstream dependencies by calling ``registerInputSource()``. This does two things: it merges the upstream source's ``requiredKeys()`` into this source's own combined key list, and it records which source to forward sub-requests to. .. code-block:: c++ // Single input, no prefix (most common case): registerInputSource(upstreamSource); // Two inputs distinguished by prefix: registerInputSource(sourceU, "U_"); registerInputSource(sourceV, "V_"); When a prefix is given, each of that source's required keys is prefixed in the combined key list, so callers must include the prefixed keys in their requests. Prefixes are useful whenever a source has two inputs of the same type (for example, two file readers) and needs to route different parts of the request to each one. See :doc:`data_requests` for how to construct and parse prefixed requests with ``subRequest()``. To swap an input source at runtime (e.g. when the user changes a dataset), call ``deregisterInputSource()`` followed by ``registerInputSource()`` with the new source. .. warning:: Re-registering an input source while a data request is scheduled for execution can cause a race condition: the running request may have been built against the old required-keys list while the new registration has already replaced it. Only re-register input sources when you can guarantee that no requests are currently being processed by this source. See `this issue `_ for more infos. Implementing a processing source ---------------------------------- Derive from ``MSingleInputProcessingWeatherPredictionDataSource`` for filters that consume one upstream source. Three methods must be overridden. 1. Declare locally consumed keys ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ``locallyRequiredKeys()`` returns only the keys this source itself reads from the request — not keys that will be forwarded upstream. The set must be complete: two requests that differ only in locally required keys must produce different data items, and two requests with identical locally required keys (and identical upstream keys) must always produce the same item. This is what makes the memory cache correct. .. code-block:: c++ const QStringList MSmoothFilter::locallyRequiredKeys() { return { "SMOOTH" }; } 2. Build the task graph ~~~~~~~~~~~~~~~~~~~~~~~ ``createTaskGraph()`` creates this source's ``MTask`` and wires it to its parent tasks. Strip the local keys before forwarding, so the upstream source receives only its own required keys: .. code-block:: c++ MTask* MSmoothFilter::createTaskGraph(MDataRequest request) { MTask* task = new MTask(request, this); MDataRequestHelper rh(request); rh.removeAll(locallyRequiredKeys()); task->addParent(inputSource->getTaskGraph(rh.request())); return task; } 3. Produce the data ~~~~~~~~~~~~~~~~~~~~ ``produceData()`` runs in a worker thread. Parse the request, acquire input data, create a result grid, compute, and return the heap-allocated result. The memory manager takes ownership of the returned pointer. .. code-block:: c++ MStructuredGrid* MSmoothFilter::produceData(MDataRequest request) { MDataRequestHelper rh(request); // Read locally required parameters int filterType = rh.intValue("SMOOTH"); // Strip local keys and fetch input rh.removeAll(locallyRequiredKeys()); MDataLease inputGrid = inputSource->getData(rh.request()); // Input item was not able to be produced. if (!inputGrid) return createEmptyGrid(); // Create result grid of the same type as the input MStructuredGrid* result = createAndInitializeResultGrid(inputGrid); // ... perform computation ... return result; // memory manager takes ownership } ``createAndInitializeResultGrid()`` copies grid dimensions, coordinate arrays, and (for hybrid sigma-pressure and auxiliary pressure grids) the associated pressure fields into the new grid, so the result is a fully initialised grid of the correct type ready to be filled with values. Wiring into the pipeline ~~~~~~~~~~~~~~~~~~~~~~~~~ Call ``setInputSource()`` (provided by ``MSingleInputProcessingWeatherPredictionDataSource``) to connect the filter to its upstream source. This internally calls ``registerInputSource()`` and enables request pass-through: .. code-block:: c++ smoothFilter->setMemoryManager(memoryManager); smoothFilter->setInputSource(upstreamSource); Weather prediction metadata interface --------------------------------------- ``MWeatherPredictionDataSource`` exposes methods for querying what data is available, which actors use to populate their variable/time/member selectors: .. code-block:: c++ QList availableLevelTypes(); QStringList availableVariables(MVerticalLevelType levelType); QSet availableEnsembleMembers(MVerticalLevelType, const QString& variable); QList availableInitTimes(MVerticalLevelType, const QString& variable); QList availableValidTimes(MVerticalLevelType, const QString& variable, const QDateTime& initTime); Processing sources derived from ``MSingleInputProcessingWeatherPredictionDataSource`` do not need to implement these. They are delegated to the input source automatically. Filters that change the available set (e.g. a regridder that changes the level type) must override the relevant methods.