Source code for parcels.particlefile

"""Module controlling the writing of ParticleSets to NetCDF file"""
import numpy as np
import netCDF4
from datetime import timedelta as delta
from parcels.loggers import logger

__all__ = ['ParticleFile']

[docs]class ParticleFile(object): """Initialise netCDF4.Dataset for trajectory output. The output follows the format outlined in the Discrete Sampling Geometries section of the CF-conventions: The current implementation is based on the NCEI template: Both 'Orthogonal multidimensional array' and 'Indexed ragged array' representation are implemented. The former is simpler to post-process, but the latter is required when particles will be added during the .execute (i.e. the number of particles in the pset increases). Developer note: We cannot use xray.Dataset here, since it does not yet allow incremental writes to disk: :param name: Basename of the output file :param particleset: ParticleSet to output :param user_vars: A list of additional user defined particle variables to write :param type: Either 'array' for default matrix style, or 'indexed' for indexed ragged array """ def __init__(self, name, particleset, type='array'): self.type = type = name self.lasttime_written = None # variable to check if time has been written already self.dataset = netCDF4.Dataset("" % name, "w", format="NETCDF4") self.dataset.createDimension("obs", None) if self.type is 'array': self.dataset.createDimension("trajectory", particleset.size) coords = ("trajectory", "obs") elif self.type is 'indexed': coords = ("obs") else: raise RuntimeError("ParticleFile type must be either 'array' or 'indexed'") self.dataset.feature_type = "trajectory" self.dataset.Conventions = "CF-1.6/CF-1.7" self.dataset.ncei_template_version = "NCEI_NetCDF_Trajectory_Template_v2.0" # Create ID variable according to CF conventions if self.type is 'array': = self.dataset.createVariable("trajectory", "i4", ("trajectory",)) = "Unique identifier for each particle" = "trajectory_id"[:] = np.array([ for p in particleset]) elif self.type is 'indexed': = self.dataset.createVariable("trajectory", "i4", ("obs",)) = "index of trajectory this obs belongs to" # Create time, lat, lon and z variables according to CF conventions: self.time = self.dataset.createVariable("time", "f8", coords, fill_value=np.nan) self.time.long_name = "" self.time.standard_name = "time" if particleset.time_origin == 0: self.time.units = "seconds" else: self.time.units = "seconds since " + str(particleset.time_origin) self.time.calendar = "julian" self.time.axis = "T" = self.dataset.createVariable("lat", "f4", coords, fill_value=np.nan) = "" = "latitude" = "degrees_north" = "Y" self.lon = self.dataset.createVariable("lon", "f4", coords, fill_value=np.nan) self.lon.long_name = "" self.lon.standard_name = "longitude" self.lon.units = "degrees_east" self.lon.axis = "X" self.z = self.dataset.createVariable("z", "f4", coords, fill_value=np.nan) self.z.long_name = "" self.z.standard_name = "depth" self.z.units = "m" self.z.positive = "down" self.user_vars = [] for v in particleset.ptype.variables: if in ['time', 'lat', 'lon', 'depth', 'z', 'id']: continue if v.to_write is True: setattr(self,, self.dataset.createVariable(, "f4", coords, fill_value=np.nan)) getattr(self, = "" getattr(self, = getattr(self, = "unknown" self.user_vars += [] self.idx = 0 def __del__(self): self.dataset.close()
[docs] def sync(self): """Write all buffered data to disk""" self.dataset.sync()
[docs] def write(self, pset, time, sync=True): """Write :class:`parcels.particleset.ParticleSet` data to file :param pset: ParticleSet object to write :param time: Time at which to write ParticleSet :param sync: Optional argument whether to write data to disk immediately. Default is True """ if isinstance(time, delta): time = time.total_seconds() if self.lasttime_written != time: # only write if 'time' hasn't been written yet self.lasttime_written = time if self.type is 'array': # Check if largest particle ID is smaller than the last ID in ParticleFile. # Otherwise, new particles have been added and netcdf will fail if pset.size > 0: if max([ for p in pset]) >[-1]: logger.error("Number of particles appears to increase. Use type='indexed' for ParticleFile") # Finds the indices (inds) of the particle IDs in the ParticleFile, # because particles can have been deleted pids = [ for p in pset] inds = np.in1d([:], pids, assume_unique=True) inds = np.arange(len([:]))[inds] self.time[inds, self.idx] = time[inds, self.idx] = np.array([ for p in pset]) self.lon[inds, self.idx] = np.array([p.lon for p in pset]) self.z[inds, self.idx] = np.array([p.depth for p in pset]) for var in self.user_vars: getattr(self, var)[inds, self.idx] = np.array([getattr(p, var) for p in pset]) else: logger.warning("ParticleSet is empty on writing as array") self.idx += 1 elif self.type is 'indexed': ind = np.arange(pset.size) + self.idx[ind] = np.array([ for p in pset]) self.time[ind] = time[ind] = np.array([ for p in pset]) self.lon[ind] = np.array([p.lon for p in pset]) self.z[ind] = np.array([p.depth for p in pset]) for var in self.user_vars: getattr(self, var)[ind] = np.array([getattr(p, var) for p in pset]) self.idx += pset.size if sync: self.sync()