|
7 | 7 | def willr(
|
8 | 8 | data: Union[pd.DataFrame, pl.DataFrame],
|
9 | 9 | period: int = 14,
|
10 |
| - result_column: str = None, |
| 10 | + result_column: str = "WILLR", |
11 | 11 | high_column: str = "High",
|
12 | 12 | low_column: str = "Low",
|
13 | 13 | close_column: str = "Close"
|
14 | 14 | ) -> Union[pd.DataFrame, pl.DataFrame]:
|
15 |
| - """ |
16 |
| - Function to calculate the Williams %R indicator of a series. |
17 | 15 |
|
18 |
| - Args: |
19 |
| - data (Union[pd.DataFrame, pl.DataFrame]): The input data. |
20 |
| - source_column (str): The name of the series. |
21 |
| - period (int): The period for the Williams %R calculation. |
22 |
| - result_column (str, optional): The name of the column to store |
23 |
| - the Williams %R values. Defaults to None, which means it will |
24 |
| - be named "WilliamsR_{period}". |
25 |
| -
|
26 |
| - Returns: |
27 |
| - Union[pd.DataFrame, pl.DataFrame]: The DataFrame with |
28 |
| - the Williams %R column added. |
29 |
| - """ |
30 |
| - |
31 |
| - # Check if the high and low columns are present |
32 | 16 | if high_column not in data.columns:
|
33 |
| - raise PyIndicatorException( |
34 |
| - f"Column '{high_column}' not found in DataFrame" |
35 |
| - ) |
| 17 | + raise PyIndicatorException(f"Column '{high_column}' not found in DataFrame") |
36 | 18 |
|
37 | 19 | if low_column not in data.columns:
|
38 |
| - raise PyIndicatorException( |
39 |
| - f"Column '{low_column}' not found in DataFrame" |
40 |
| - ) |
| 20 | + raise PyIndicatorException(f"Column '{low_column}' not found in DataFrame") |
41 | 21 |
|
42 | 22 | if isinstance(data, pd.DataFrame):
|
43 |
| - data["high_n"] = data[high_column]\ |
44 |
| - .rolling(window=period, min_periods=1).max() |
45 |
| - data["low_n"] = data[low_column]\ |
46 |
| - .rolling(window=period, min_periods=1).min() |
| 23 | + data["high_n"] = data[high_column].rolling(window=period, min_periods=1).max() |
| 24 | + data["low_n"] = data[low_column].rolling(window=period, min_periods=1).min() |
| 25 | + |
47 | 26 | data[result_column] = ((data["high_n"] - data[close_column]) /
|
48 | 27 | (data["high_n"] - data["low_n"])) * -100
|
| 28 | + |
| 29 | + # Set the first `period` rows to 0 using .iloc |
| 30 | + if not data.empty: |
| 31 | + data.iloc[:period - 1, data.columns.get_loc(result_column)] = 0 |
| 32 | + |
49 | 33 | return data.drop(columns=["high_n", "low_n"])
|
50 | 34 |
|
51 | 35 | elif isinstance(data, pl.DataFrame):
|
52 |
| - high_n = data.select(pl.col(high_column).rolling_max(period) |
53 |
| - .alias("high_n")) |
54 |
| - low_n = data.select(pl.col(low_column).rolling_min(period) |
55 |
| - .alias("low_n")) |
| 36 | + high_n = data.select(pl.col(high_column).rolling_max(period).alias("high_n")) |
| 37 | + low_n = data.select(pl.col(low_column).rolling_min(period).alias("low_n")) |
56 | 38 |
|
57 | 39 | data = data.with_columns([
|
58 | 40 | high_n["high_n"],
|
59 | 41 | low_n["low_n"]
|
60 | 42 | ])
|
| 43 | + |
61 | 44 | data = data.with_columns(
|
62 |
| - ((pl.col("high_n") - pl.col(close_column)) / |
63 |
| - (pl.col("high_n") - pl.col("low_n")) * -100).alias(result_column) |
| 45 | + ((pl.col("high_n") - pl.col(close_column)) / (pl.col("high_n") - pl.col("low_n")) * -100) |
| 46 | + .alias(result_column) |
64 | 47 | )
|
65 |
| - return data.drop(["high_n", "low_n"]) |
66 | 48 |
|
| 49 | + # Set the first `period` rows of result_column to 0 directly in Polars |
| 50 | + if data.height > 0: |
| 51 | + zero_values = [0] * (period - 1)+ data[result_column].to_list()[period - 1:] |
| 52 | + data = data.with_columns(pl.Series(result_column, zero_values)) |
| 53 | + |
| 54 | + return data.drop(["high_n", "low_n"]) |
67 | 55 | else:
|
68 | 56 | raise PyIndicatorException(
|
69 | 57 | "Unsupported data type. Must be pandas or polars DataFrame."
|
|
0 commit comments