Skip to content

Commit 11ebb77

Browse files
renovate-botgcf-owl-bot[bot]davidcavazos
authored
chore(deps): update apache/beam_python3.11_sdk docker tag to v2.59.0 (#12550)
* chore(deps): update apache/beam_python3.11_sdk docker tag to v2.59.0 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * match requirement version to docker version * use beam 2.58 since 2.59 is not out yet --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: David Cavazos <dcavazos@google.com>
1 parent 3d5ea06 commit 11ebb77

File tree

8 files changed

+53
-54
lines changed

8 files changed

+53
-54
lines changed

dataflow/snippets/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ FROM ubuntu:focal
2222

2323
WORKDIR /pipeline
2424

25-
COPY --from=apache/beam_python3.11_sdk:2.57.0 /opt/apache/beam /opt/apache/beam
25+
COPY --from=apache/beam_python3.11_sdk:2.58.0 /opt/apache/beam /opt/apache/beam
2626
ENTRYPOINT [ "/opt/apache/beam/boot" ]
2727

2828
COPY requirements.txt .

dataflow/snippets/batch_write_storage.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from typing_extensions import Self
2525

2626

27-
def write_to_cloud_storage(argv : List[str] = None) -> None:
27+
def write_to_cloud_storage(argv: List[str] = None) -> None:
2828
# Parse the pipeline options passed into the application.
2929
class MyOptions(PipelineOptions):
3030
@classmethod
@@ -41,6 +41,8 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
4141
| "Create elements" >> beam.Create(wordsList)
4242
| "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
4343
)
44+
45+
4446
# [END dataflow_batch_write_to_storage]
4547

4648

dataflow/snippets/read_kafka.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626

2727
def read_from_kafka() -> None:
28-
2928
# Parse the pipeline options passed into the application. Example:
3029
# --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
3130
# --output=$CLOUD_STORAGE_BUCKET --streaming
@@ -34,34 +33,33 @@ def read_from_kafka() -> None:
3433
class MyOptions(PipelineOptions):
3534
@staticmethod
3635
def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
37-
parser.add_argument('--topic')
38-
parser.add_argument('--bootstrap_server')
39-
parser.add_argument('--output')
36+
parser.add_argument("--topic")
37+
parser.add_argument("--bootstrap_server")
38+
parser.add_argument("--output")
4039

4140
options = MyOptions()
4241
with beam.Pipeline(options=options) as pipeline:
4342
(
4443
pipeline
4544
# Read messages from an Apache Kafka topic.
4645
| ReadFromKafka(
47-
consumer_config={
48-
"bootstrap.servers": options.bootstrap_server
49-
},
46+
consumer_config={"bootstrap.servers": options.bootstrap_server},
5047
topics=[options.topic],
5148
with_metadata=False,
5249
max_num_records=5,
53-
start_read_time=0
50+
start_read_time=0,
5451
)
5552
# The previous step creates a key-value collection, keyed by message ID.
5653
# The values are the message payloads.
5754
| beam.Values()
5855
# Subdivide the output into fixed 5-second windows.
5956
| beam.WindowInto(window.FixedWindows(5))
6057
| WriteToText(
61-
file_path_prefix=options.output,
62-
file_name_suffix='.txt',
63-
num_shards=1)
58+
file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
59+
)
6460
)
61+
62+
6563
# [END dataflow_kafka_read]
6664

6765

dataflow/snippets/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
apache-beam[gcp]==2.50.0
1+
apache-beam[gcp]==2.58.0
22
kafka-python==2.0.2

dataflow/snippets/tests/test_batch_write_storage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from ..batch_write_storage import write_to_cloud_storage
2323

2424

25-
bucket_name = f'test-bucket-{uuid.uuid4()}'
25+
bucket_name = f"test-bucket-{uuid.uuid4()}"
2626
storage_client = storage.Client()
2727

2828

@@ -39,7 +39,7 @@ def setup_and_teardown() -> None:
3939

4040

4141
def test_write_to_cloud_storage(setup_and_teardown: None) -> None:
42-
sys.argv = ['', f'--output=gs://{bucket_name}/output/out-']
42+
sys.argv = ["", f"--output=gs://{bucket_name}/output/out-"]
4343
write_to_cloud_storage()
4444

4545
blobs = list(storage_client.list_blobs(bucket_name))

dataflow/snippets/tests/test_read_kafka.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,18 @@
2525
import pytest
2626

2727

28-
BOOTSTRAP_SERVER = 'localhost:9092'
29-
TOPIC_NAME = f'topic-{uuid.uuid4()}'
30-
CONTAINER_IMAGE_NAME = 'kafka-pipeline:1'
28+
BOOTSTRAP_SERVER = "localhost:9092"
29+
TOPIC_NAME = f"topic-{uuid.uuid4()}"
30+
CONTAINER_IMAGE_NAME = "kafka-pipeline:1"
3131

3232

33-
@pytest.fixture(scope='module', autouse=True)
33+
@pytest.fixture(scope="module", autouse=True)
3434
def kafka_container() -> None:
3535
# Start a containerized Kafka server.
3636
docker_client = docker.from_env()
37-
container = docker_client.containers.run('apache/kafka:3.7.0', network_mode='host', detach=True)
37+
container = docker_client.containers.run(
38+
"apache/kafka:3.7.0", network_mode="host", detach=True
39+
)
3840
try:
3941
create_topic()
4042
yield
@@ -48,41 +50,43 @@ def create_topic() -> None:
4850
try:
4951
client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVER)
5052
topics = []
51-
topics.append(NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1))
53+
topics.append(
54+
NewTopic(name=TOPIC_NAME, num_partitions=1, replication_factor=1)
55+
)
5256
client.create_topics(topics)
5357
break
5458
except NoBrokersAvailable:
5559
time.sleep(5)
5660

5761

5862
def test_read_from_kafka(tmp_path: Path) -> None:
59-
60-
file_name_prefix = f'output-{uuid.uuid4()}'
61-
file_name = f'{tmp_path}/{file_name_prefix}-00000-of-00001.txt'
63+
file_name_prefix = f"output-{uuid.uuid4()}"
64+
file_name = f"{tmp_path}/{file_name_prefix}-00000-of-00001.txt"
6265

6366
# Send some messages to Kafka
6467
producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER)
6568
for i in range(0, 5):
66-
message = f'event-{i}'
69+
message = f"event-{i}"
6770
producer.send(TOPIC_NAME, message.encode())
6871

6972
# Build a container image for the pipeline.
7073
client = docker.from_env()
71-
client.images.build(path='./', tag=CONTAINER_IMAGE_NAME)
74+
client.images.build(path="./", tag=CONTAINER_IMAGE_NAME)
7275

7376
# Run the pipeline.
7477
client.containers.run(
7578
image=CONTAINER_IMAGE_NAME,
76-
command=f'/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {TOPIC_NAME}',
77-
volumes=['/var/run/docker.sock:/var/run/docker.sock', f'{tmp_path}/:/out'],
78-
network_mode='host',
79-
entrypoint='python')
79+
command=f"/pipeline/read_kafka.py --output /out/{file_name_prefix} --bootstrap_server {BOOTSTRAP_SERVER} --topic {TOPIC_NAME}",
80+
volumes=["/var/run/docker.sock:/var/run/docker.sock", f"{tmp_path}/:/out"],
81+
network_mode="host",
82+
entrypoint="python",
83+
)
8084

8185
# Verify the pipeline wrote the Kafka messages to the output file.
82-
with open(file_name, 'r') as f:
86+
with open(file_name, "r") as f:
8387
text = f.read()
8488
for i in range(0, 5):
85-
assert f'event-{i}' in text
89+
assert f"event-{i}" in text
8690

8791

8892
if __name__ == "__main__":

dataflow/snippets/tests/test_write_pubsub.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
from ..write_pubsub import write_to_pubsub
2626

2727

28-
topic_id = f'test-topic-{uuid.uuid4()}'
29-
subscription_id = f'{topic_id}-sub'
28+
topic_id = f"test-topic-{uuid.uuid4()}"
29+
subscription_id = f"{topic_id}-sub"
3030
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
3131

3232
publisher = pubsub_v1.PublisherClient()
@@ -48,8 +48,7 @@ def setup_and_teardown() -> None:
4848
)
4949
yield
5050
finally:
51-
subscriber.delete_subscription(
52-
request={"subscription": subscription_path})
51+
subscriber.delete_subscription(request={"subscription": subscription_path})
5352
publisher.delete_topic(request={"topic": topic_path})
5453

5554

@@ -76,7 +75,7 @@ def read_messages() -> None:
7675
request={"subscription": subscription_path, "ack_ids": ack_ids}
7776
)
7877

79-
if (len(received_messages) >= NUM_MESSAGES):
78+
if len(received_messages) >= NUM_MESSAGES:
8079
break
8180

8281
time.sleep(5)
@@ -86,10 +85,10 @@ def read_messages() -> None:
8685

8786
def test_write_to_pubsub(setup_and_teardown: None) -> None:
8887
topic_path = publisher.topic_path(project_id, topic_id)
89-
with patch("sys.argv", ["", '--streaming', f'--topic={topic_path}']):
88+
with patch("sys.argv", ["", "--streaming", f"--topic={topic_path}"]):
9089
write_to_pubsub()
9190

9291
# Read from Pub/Sub to verify the pipeline successfully wrote messages.
9392
# Duplicate reads are possible.
9493
messages = read_messages()
95-
assert (len(messages) >= NUM_MESSAGES)
94+
assert len(messages) >= NUM_MESSAGES

dataflow/snippets/write_pubsub.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,13 @@ def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
3232
# https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
3333
from apache_beam.io import PubsubMessage
3434

35-
attributes = {
36-
'buyer': item['name'],
37-
'timestamp': str(item['ts'])
38-
}
39-
data = bytes(item['product'], 'utf-8')
35+
attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
36+
data = bytes(item["product"], "utf-8")
4037

4138
return PubsubMessage(data=data, attributes=attributes)
4239

4340

4441
def write_to_pubsub(argv: List[str] = None) -> None:
45-
4642
# Parse the pipeline options passed into the application. Example:
4743
# --topic=$TOPIC_PATH --streaming
4844
# For more information, see
@@ -54,10 +50,10 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
5450
parser.add_argument("--topic", required=True)
5551

5652
example_data = [
57-
{'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
58-
{'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
59-
{'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
60-
{'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
53+
{"name": "Robert", "product": "TV", "ts": 1613141590000},
54+
{"name": "Maria", "product": "Phone", "ts": 1612718280000},
55+
{"name": "Juan", "product": "Laptop", "ts": 1611618000000},
56+
{"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
6157
]
6258
options = MyOptions()
6359

@@ -66,12 +62,12 @@ def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
6662
pipeline
6763
| "Create elements" >> beam.Create(example_data)
6864
| "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
69-
| WriteToPubSub(
70-
topic=options.topic,
71-
with_attributes=True)
65+
| WriteToPubSub(topic=options.topic, with_attributes=True)
7266
)
7367

74-
print('Pipeline ran successfully.')
68+
print("Pipeline ran successfully.")
69+
70+
7571
# [END dataflow_pubsub_write_with_attributes]
7672

7773

0 commit comments

Comments
 (0)