Get echo data generated by a moored echosounder#

This notebook uses EK60 echosounder data from the U.S. Ocean Observatories Initiative (OOI). The raw data is converted, combined, calibrated using echopype. Calibrated MVBS data from echopype procession is stored in form of .nc files locally or on the cloud. After loading Calibrated data, echoshader can load these files to plot echograms. This lazy load mechanism can avoid memory overflow resulted by processing all raw data at the same time.

Data are from the OOI Oregon Offshore Cabled Shallow Profiler Mooring collected on August 1-31, 2017. This includes the date of a solar eclipse, during which the reduced sunlight affected the regular diel vertical migration (DVM) patterns of marine life. This change was directly observed using the upward-looking echosounder mounted on this mooring platform that happened to be within the totality zone.

Establish file system connection and Process raw files with echopype#

from pathlib import Path
import itertools as it
import datetime as dt
from dateutil import parser as dtparser

import fsspec
import gc

import echopype as ep
import xarray as xr
data_type='Moored'
range_meter_bin = 0.1
ping_time_bin = '1s'
start_datetime = dt.datetime(2017, 8, 1, 0, 0)
end_datetime = dt.datetime(2017, 8, 30, 23, 59)
base_dpath = Path('./'+f"{data_type}_range={range_meter_bin}_ping={ping_time_bin}_start={start_datetime.strftime('%Y-%m-%d-%H-%M')}_end={end_datetime.strftime('%Y-%m-%d-%H-%M')}"
)
base_dpath.mkdir(exist_ok=True)

calibrated_dpath = (base_dpath / 'OOI')
calibrated_dpath.mkdir(exist_ok=True)
fs = fsspec.filesystem('https')
ooi_raw_url = (
    "https://rawdata.oceanobservatories.org/files/"
    "CE04OSPS/PC01B/ZPLSCB102_10.33.10.143/2017/08"
)
desired_day_urls = [f"{ooi_raw_url}/{str(day).zfill(2)}" for day in range(start_datetime.day, end_datetime.day + 1)]
all_raw_file_urls = it.chain.from_iterable([fs.glob(f"{day_url}/*.raw") for day_url in desired_day_urls])
def in_range(raw_file: str, start: dt.datetime, end: dt.datetime) -> bool:
    """Check if file url is in datetime range"""
    file_name = Path(raw_file).name
    file_datetime = dtparser.parse(file_name, fuzzy=True)
    return file_datetime >= start and file_datetime <= end
desired_raw_file_urls = list(filter(
    lambda raw_file: in_range(
        raw_file, 
        start_datetime,
        end_datetime
    ), 
    all_raw_file_urls
))

print(f"There are {len(desired_raw_file_urls)} raw files within the specified datetime range.")
for desired_raw_file_url in desired_raw_file_urls:
    raw_fpath = Path(desired_raw_file_url)
    
    try_times = 0
    while True:
        try_times += 1
        
        if try_times > 2:
            print(f"Failed to process raw file {raw_fpath.name}.")
            break
        
        try:
            # Access file directly from S3 to create a converted EchoData object in memory
            ed = ep.open_raw(
                f"{desired_raw_file_url}",
                sonar_model='ek60'
            )

            # Use the EchoData object "ed" to generate calibrated and
            # computed MVBS files that will be saved to netcdf
            ds_Sv = ep.calibrate.compute_Sv(ed)

            ds_MVBS = ep.commongrid.compute_MVBS(
                ds_Sv,
                range_meter_bin=range_meter_bin,  # in meters
                ping_time_bin=ping_time_bin  # in seconds
            )

            ds_MVBS.to_netcdf(calibrated_dpath / f"MVBS_{raw_fpath.stem}.nc")
            
            print(f"MVBS_{raw_fpath.stem}.nc created")

            # release to avoid memory overflow
            del ed
            del ds_Sv
            del ds_MVBS
            
            gc.collect()
            
        except Exception as e:
            print(f"Failed to process raw file {raw_fpath.name}: {e}. Execute again")
            continue
            
        break

Load and Concatenate Converted MVBS Files#

MVBS_ds_moored = xr.open_mfdataset(
    str(calibrated_dpath / 'MVBS_*.nc'), 
    data_vars='minimal', coords='minimal',
    combine='by_coords'
)

MVBS_ds_moored
MVBS_ds_moored.to_netcdf(base_dpath / "concatenated_MVBS.nc")