Avro
AvroPool
AvroPool (_cnt:int=0, recipe:configparser.ConfigParser, query:tspace.data.core.PoolQuery, meta:tspace.data.core.ObservationMeta, pl_path:Optional[pathlib.Path]=None, logger:Optional[logging.Logger]=None, dict_logger:Optional[dict]=None, dbg:Optional[dask.bag.core.Bag]=None, dbg_schema:Optional[dict]=None)
*AvroPool is the avro storage for pooling the real-time data from the cloud.
Features: - It’s supposed to support large local data pool with buffer capacity only bounded by local system storage.
- It uses Dask Bag to store the data in memory and Dask DataFrame to process the data.
- Meta information is stored in avro metadata in each of avro file. Sampling
- Random episodes needs some care to reassure the randomness. It uses Dask Delayed to parallelize the data processing like sampling
Attributes: - dbg: Dask Bag of episodes - dbg_schema: schema for avro file decoding*
AvroPool.__post_init__
AvroPool.__post_init__ ()
Set up logger, post init of DaskPool and load the pool.
AvroPool.find
AvroPool.find (query:tspace.data.core.PoolQuery)
*Find records by the PoolQuery
object. The down-stream task can use pandas dataframe unique() for index to get unique episodes.
Arg: query: PoolQuery
object
Return: A multi-indexed DataFrame with all episodes in the query range.*
AvroPool.load
AvroPool.load ()
load EPISODE arrays from avro files in folder specified by the recipe
AvroPool.close
AvroPool.close ()
close the pool
AvroPool.sample
AvroPool.sample (size:int=4, query:Optional[tspace.data.core.PoolQuery]=None)
*Sample a batch of episodes from Apache avro pool.
downstream can use pandas DataFrame unique() for index to extract single episodes. since return is a dataframe, downstream can use pandas dataframe unique() for index to get unique episodes. Therefore, decoding to DataFrame have to be done in avro pool
Args: size: number of episodes to sample query: PoolQuery
object
Return: A DataFrame with all episodes*
Type | Default | Details | |
---|---|---|---|
size | int | 4 | desired size of the samples |
query | Optional[PoolQuery] | None | |
Returns | pd.DataFrame | query for sampling |
AvroPool.store
AvroPool.store (episode:pandas.core.frame.DataFrame)
Deposit an episode as a single item into avro.
AvroPool.get_query
AvroPool.get_query (query:Optional[tspace.data.core.PoolQuery]=None)
*get query from dask dataframe
Arg: query: PoolQuery
object
return: a Dask Bag with all episodes in the query range*
AvroPool.remove_episode
AvroPool.remove_episode (query:tspace.data.core.PoolQuery)
*remove episodes in the query from bag, but not from avro file!
Delete all episodes in the query range. Modify the bag in place.
Arg: query: PoolQuery
object
Return: None*