Commit 64589068 authored by Flavio Coelho's avatar Flavio Coelho

Added python scripts

parent 57cb8d89
__author__ = 'Marcelo Ferreira da Costa Gomes'
import numpy as np
import datetime
Return Brazilian epidemiological week from passed date
def extractweekday(x=datetime.datetime):
# Extract weekday as [Sun-Sat] |-> [0-6]
w = x.isoweekday() % 7 # isoweekday() returns weekday with [Mon-Sun] as [1-7]
return w
def firstepiday(year=int):
day = datetime.datetime.strptime('%s-01-01' % year, '%Y-%m-%d')
day_week = extractweekday(day)
# Whe want day1 to correspond to the first day of the first epiweek. That is, we need
# the Sunday corresponding to epiweek=%Y01
# If first day of the year is between Sunday and Wednesday, epiweek 01 includes it
# Otherwise, it is still the last epiweek of the previous year
if day_week < 4:
day = day - datetime.timedelta(days=day_week)
day = day + datetime.timedelta(days=(7-day_week))
return day
def lastepiday(year=int):
day = datetime.datetime.strptime('%s-12-31' % year, '%Y-%m-%d')
day_week = extractweekday(day)
# Whe want day to correspond to the last day of the last epiweek. That is, we need
# the corresponding Saturday
# If the last day of the year is between Sunday and Tuesday, epiweek 01 of the next year includes it.
# Otherwise, it is still the last epiweek of the current year
if day_week < 3:
day = day - datetime.timedelta(days=(day_week+1))
day = day + datetime.timedelta(days=(6-day_week))
return day
def epiweek2date(y, w):
day1 = firstepiday(y)
saturday = (day1 + datetime.timedelta(weeks=w-1)).strftime('%Y%m%d')
return saturday
def episem(x, sep='W', out='YW'):
Return Brazilian corresponding epidemiological week from x.
:param x: Input date. Can be a string in the format %Y-%m-%d or datetime.datetime
:param sep: Year and week separator.
:param out: Output format. 'YW' returns sep.join(epiyear,epiweek).
'Y' returns epiyear only. 'W' returns epiweek only.
:return: str
def out_format(year, week, out):
if out == 'YW':
return('%sW%02d' % (year,week))
if out == 'Y':
return('%s' % (year))
if out == 'W':
return ('%02d' % week)
if not isinstance(x, datetime.datetime):
x = datetime.datetime.strptime(x, '%Y-%m-%d')
epiyear = x.year
epiend = lastepiday(epiyear)
if x > epiend:
epiyear += 1
return(out_format(epiyear, 1, out))
epistart = firstepiday(epiyear)
# If current date is before its year first epiweek, then our base year is the previous one
if x < epistart:
epiyear -= 1
epistart = firstepiday(epiyear)
epiweek = int(((x - epistart)/7).days) + 1
return(out_format(epiyear, epiweek, out))
def lastepiweek(year):
# Calculate number of year's last week
return(episem(lastepiday(year), out='W'))
# -*- coding: utf-8 -*-
import networkx as nx
import pandas as pd
import numpy as np
import dask.array as da
#import dask.dataframe as dd
import geopandas as gpd
import contextily as ctx
from scipy.special import lambertw
import pylab as P
def read_flow_matrix(fname, header=0):
Read flow matrix from csv with or without headers
For CSV without header use header=None
df = pd.read_csv(fname, header=header)
return df.values
def read_nodes(fname):
Reads csv with MR names geocodes, populations and Incidences
nodes = pd.read_csv(fname)
return nodes
def plot_probs(nodes, probs, mapa, figsize=(10, 10), geocfield='CD_GEOCMI', nodes_geocfield='geoc', basemap=True,nat_breaks=False):
Plots map colored with the probability of epidemics.
mapa[geocfield] = mapa[geocfield].astype(int)
nodes['probs'] = probs
mapa = pd.merge(mapa, nodes, left_on=geocfield, right_on=nodes_geocfield)
fig, ax = P.subplots(1, 1, figsize=figsize)
mapa = mapa.to_crs(epsg=3857)
# print(mapa.columns)
if nat_breaks:
mapa.plot(ax=ax, column='probs', alpha=1, legend=True,
scheme='user_defined', classification_kwds={'bins':[0, .5, .8, 1]}
mapa.plot(ax=ax, column='probs', alpha=1, legend=True)
if basemap:
return mapa
def get_outbreaks(flowmat, incidence, R0=2.5, asymf=10, attenuate=1.0):
Calculate the probabilities of outbreak for all regions
:param flowmat: Arriving passengers row -> column
:param incidence: fraction of infectious in the populations
:param R0: Basic reproduction number
:param asymf: how many asymptomatics per reported case
:param attenuate: Attenuation factor for flow
# Adjusting arrivals by incidence
inflows = (flowmat.T * attenuate) @ incidence
probs = 1 - (1 / R0) ** (inflows * 8 * asymf)
return probs
def calc_epi_size(nodes, R0=2.5):
Calculate final epidemic size based on the SIR Model
I0 = nodes['I'].values
N = nodes['pop'].values
S0 = N - I0
S_inf = -(N*lambertw(-(R0*S0*np.exp(-(R0*(I0+S0))/N))/N))/R0
return N - S_inf
def calc_peak_size(nodes, R0=2.5):
Calculate the peak size
I0 = nodes['I'].values
N = nodes['pop'].values
S0 = N - I0
Imax = N*(1- (1 + np.log(R0))/R0)
nodes['peak_size'] = Imax
nodes['frac_prev'] = Imax/N
return nodes
def plot_ranking(nodes, probs, figsize=(10, 10), namefield='nome'):
Creates a horizontal bar plot with ranking of MRs
by probability of epidemic.
ranking = pd.DataFrame(data={'Microrregião': nodes[namefield], 'Probabilidade': probs})
ranking.set_index('Microrregião', inplace=True)
ranking.sort_values('Probabilidade', ascending=True, inplace=True)
fig, ax = P.subplots(1, 1, figsize=figsize)
return ranking
if __name__ == "__main__":
F = read_flow_matrix('flowmatrix_full.csv', header=None)
nodes = read_nodes('../Dados/nodes.csv')
mapa = gpd.read_file('microreg.gpkg')
nodes['incidence'] = (nodes.I / nodes['pop'])
probs = get_outbreaks(F, nodes.incidence)
plot_probs(nodes, probs, mapa)
ranking = plot_ranking(nodes, probs)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment