-
Notifications
You must be signed in to change notification settings - Fork 143
Open
Labels
bugSomething isn't workingSomething isn't working
Description
I have used the following example
Map Vertext after upgrade is not starting. I am getting following warning in log:
WARN numaflow_core::shared::grpc: Failed to connect to UDS socket error=Connection("Failed to connect: transport error") socket_path="/var/run/numaflow/map.sock"
What is wrong? Does anybody know what is wrong?
I am testing it on windows with Rancher Desktop and
to reproduce the Bug you can use my test files:
To build test image from the docker file attached bellow use:
docker build -t myregistry/myrepository/my-numa-map:0.0.1 .
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-pipeline
namespace: inqu-analytics
spec:
limits:
readBatchSize: 2
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 10
duration: 1s
- name: mult
udf:
container:
image: myregistry/myrepository/my-numa-map:0.0.1
imagePullPolicy: IfNotPresent
env:
- name: PYTHONDEBUG
value: "true"
- name: NUM_CPU_MULTIPROC
value: "3" # DO NOT forget the double quotes!!!
containerTemplate:
resources:
limits:
cpu: "1"
memory: 2Gi
requests:
cpu: "500m"
memory: 1Gi
env:
- name: NUMAFLOW_DEBUG
value: "true" # DO NOT forget the double quotes!!!
- name: out
sink:
# A simple log printing sink
log: {}
edges:
- from: in
to: mult
- from: mult
to: out
import math
import os
from pynumaflow.mapper import Messages, Message, Datum, Mapper, MapMultiprocServer
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
class PrimeMap(Mapper):
"""
This class needs to be of type Mapper class to be used
as a handler for the MapServer class.
Example of a mapper that calculates if a number is prime.
"""
def handler(self, keys: list[str], datum: Datum) -> Messages:
val = datum.value
_ = datum.event_time
_ = datum.watermark
messages = Messages()
for i in range(2, 100000):
is_prime(i)
messages.append(Message(val, keys=keys))
return messages
if __name__ == "__main__":
"""
Example of starting a multiprocessing map vertex.
"""
# To set the env server_count value set the env variable
# NUM_CPU_MULTIPROC="N"
server_count = int(os.getenv("NUM_CPU_MULTIPROC", "2"))
prime_class = PrimeMap()
# Server count is the number of server processes to start
grpc_server = MapMultiprocServer(prime_class, server_count=server_count)
grpc_server.start()
To Reproduce
#
# Docker file for Inqu Worker Image for Kubernetes Installation.
#
# Debian-based image
# -------------------
FROM python:3.10-slim-bullseye AS base-builder
RUN python -m pip install --upgrade pip
ENV PYTHONFAULTHANDLER=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init | python3 -
RUN pip3 install pynumaflow==0.10.0
COPY . ./
ENTRYPOINT ["/dumb-init", "--"]
CMD ["sh", "-c", "chmod +x ./mapper.py && python ./mapper.py"]
EXPOSE 5000
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working