Avro

AvroPool class

source

AvroPool

 AvroPool (_cnt:int=0, recipe:configparser.ConfigParser,
           query:tspace.data.core.PoolQuery,
           meta:tspace.data.core.ObservationMeta,
           pl_path:Optional[pathlib.Path]=None,
           logger:Optional[logging.Logger]=None,
           dict_logger:Optional[dict]=None,
           dbg:Optional[dask.bag.core.Bag]=None,
           dbg_schema:Optional[dict]=None)

*AvroPool is the avro storage for pooling the real-time data from the cloud.

Features: - It’s supposed to support large local data pool with buffer capacity only bounded by local system storage.

- It uses Dask Bag to store the data in memory and Dask DataFrame to process the data.

- Meta information is stored in avro metadata in each of avro file. Sampling

- Random episodes needs some care to reassure the randomness. It uses Dask Delayed to parallelize the data processing like sampling

Attributes: - dbg: Dask Bag of episodes - dbg_schema: schema for avro file decoding*


source

AvroPool.__post_init__

 AvroPool.__post_init__ ()

Set up logger, post init of DaskPool and load the pool.


source

AvroPool.find

 AvroPool.find (query:tspace.data.core.PoolQuery)

*Find records by the PoolQuery object. The down-stream task can use pandas dataframe unique() for index to get unique episodes.

Arg: query: PoolQuery object

Return: A multi-indexed DataFrame with all episodes in the query range.*


source

AvroPool.load

 AvroPool.load ()

load EPISODE arrays from avro files in folder specified by the recipe


source

AvroPool.close

 AvroPool.close ()

close the pool


source

AvroPool.sample

 AvroPool.sample (size:int=4,
                  query:Optional[tspace.data.core.PoolQuery]=None)

*Sample a batch of episodes from Apache avro pool.

downstream can use pandas DataFrame unique() for index to extract single episodes. since return is a dataframe, downstream can use pandas dataframe unique() for index to get unique episodes. Therefore, decoding to DataFrame have to be done in avro pool

Args: size: number of episodes to sample query: PoolQuery object

Return: A DataFrame with all episodes*

Type Default Details
size int 4 desired size of the samples
query Optional[PoolQuery] None
Returns pd.DataFrame query for sampling

source

AvroPool.store

 AvroPool.store (episode:pandas.core.frame.DataFrame)

Deposit an episode as a single item into avro.


source

AvroPool.get_query

 AvroPool.get_query (query:Optional[tspace.data.core.PoolQuery]=None)

*get query from dask dataframe

Arg: query: PoolQuery object

return: a Dask Bag with all episodes in the query range*


source

AvroPool.remove_episode

 AvroPool.remove_episode (query:tspace.data.core.PoolQuery)

*remove episodes in the query from bag, but not from avro file!

Delete all episodes in the query range. Modify the bag in place.

Arg: query: PoolQuery object

Return: None*