Source code for parcels.particlefile

"""Module controlling the writing of ParticleSets to NetCDF file"""
import numpy as np
import netCDF4
from datetime import timedelta as delta
from parcels.loggers import logger
from os import path

__all__ = ['ParticleFile']

[docs]class ParticleFile(object): """Initialise netCDF4.Dataset for trajectory output. The output follows the format outlined in the Discrete Sampling Geometries section of the CF-conventions: The current implementation is based on the NCEI template: Developer note: We cannot use xray.Dataset here, since it does not yet allow incremental writes to disk: :param name: Basename of the output file :param particleset: ParticleSet to output :param outputdt: Interval which dictates the update frequency of file output while ParticleFile is given as an argument of ParticleSet.execute() It is either a timedelta object or a positive double. :param write_ondelete: Boolean to write particle data only when they are deleted. Default is False """ def __init__(self, name, particleset, outputdt=np.infty, write_ondelete=False): = name self.write_ondelete = write_ondelete self.outputdt = outputdt self.lasttraj = 0 # id of last particle written self.lasttime_written = None # variable to check if time has been written already extension = path.splitext(str(name))[1] fname = name if extension in ['.nc', '.nc4'] else "" % name self.dataset = netCDF4.Dataset(fname, "w", format="NETCDF4") self.dataset.createDimension("obs", None) self.dataset.createDimension("traj", None) coords = ("traj", "obs") self.dataset.feature_type = "trajectory" self.dataset.Conventions = "CF-1.6/CF-1.7" self.dataset.ncei_template_version = "NCEI_NetCDF_Trajectory_Template_v2.0" # Create ID variable according to CF conventions = self.dataset.createVariable("trajectory", "i4", coords) = "Unique identifier for each particle" = "trajectory_id" # Create time, lat, lon and z variables according to CF conventions: self.time = self.dataset.createVariable("time", "f8", coords, fill_value=np.nan) self.time.long_name = "" self.time.standard_name = "time" if particleset.time_origin == 0: self.time.units = "seconds" else: self.time.units = "seconds since " + str(particleset.time_origin) self.time.calendar = "julian" self.time.axis = "T" = self.dataset.createVariable("lat", "f4", coords, fill_value=np.nan) = "" = "latitude" = "degrees_north" = "Y" self.lon = self.dataset.createVariable("lon", "f4", coords, fill_value=np.nan) self.lon.long_name = "" self.lon.standard_name = "longitude" self.lon.units = "degrees_east" self.lon.axis = "X" self.z = self.dataset.createVariable("z", "f4", coords, fill_value=np.nan) self.z.long_name = "" self.z.standard_name = "depth" self.z.units = "m" self.z.positive = "down" self.user_vars = [] self.user_vars_once = [] """ :user_vars: list of additional user defined particle variables to write for all particles and all times :user_vars_once: list of additional user defined particle variables to write for all particles only once at initial time. """ for v in particleset.ptype.variables: if in ['time', 'lat', 'lon', 'depth', 'z', 'id']: continue if v.to_write: if v.to_write is True: setattr(self,, self.dataset.createVariable(, "f4", coords, fill_value=np.nan)) self.user_vars += [] elif v.to_write == 'once': setattr(self,, self.dataset.createVariable(, "f4", "traj", fill_value=np.nan)) self.user_vars_once += [] getattr(self, = "" getattr(self, = getattr(self, = "unknown" self.idx = np.empty(shape=0) def __del__(self): self.dataset.close()
[docs] def sync(self): """Write all buffered data to disk""" self.dataset.sync()
[docs] def write(self, pset, time, sync=True, deleted_only=False): """Write :class:`parcels.particleset.ParticleSet` data to file :param pset: ParticleSet object to write :param time: Time at which to write ParticleSet :param sync: Optional argument whether to write data to disk immediately. Default is True """ if isinstance(time, delta): time = time.total_seconds() if self.lasttime_written != time and \ (self.write_ondelete is False or deleted_only is True): if pset.size > 0: first_write = [p for p in pset if p.fileid < 0 or len(self.idx) == 0] # len(self.idx)==0 in case pset is written to new ParticleFile for p in first_write: p.fileid = self.lasttraj self.lasttraj += 1 self.idx = np.append(self.idx, np.zeros(len(first_write))) for p in pset: i = p.fileid[i, self.idx[i]] = self.time[i, self.idx[i]] = time[i, self.idx[i]] = self.lon[i, self.idx[i]] = p.lon self.z[i, self.idx[i]] = p.depth for var in self.user_vars: getattr(self, var)[i, self.idx[i]] = getattr(p, var) for p in first_write: for var in self.user_vars_once: getattr(self, var)[p.fileid] = getattr(p, var) else: logger.warning("ParticleSet is empty on writing as array") if not deleted_only: self.idx += 1 self.lasttime_written = time if sync: self.sync()