6
6
import uuid
7
7
from functools import partial
8
8
from typing import Callable , Literal , Optional , Union
9
+ from urllib .parse import quote_plus as qp
9
10
10
11
import bson
11
12
@@ -65,9 +66,12 @@ def _default_document_matcher(record: SinkItem) -> MongoQueryFilter:
65
66
class MongoDBSink (BatchingSink ):
66
67
def __init__ (
67
68
self ,
68
- url : str ,
69
+ host : str ,
69
70
db : str ,
70
71
collection : str ,
72
+ username : Optional [str ] = None ,
73
+ password : Optional [str ] = None ,
74
+ port : int = 27017 ,
71
75
document_matcher : Callable [
72
76
[SinkItem ], MongoQueryFilter
73
77
] = _default_document_matcher ,
@@ -82,9 +86,12 @@ def __init__(
82
86
"""
83
87
A connector to sink processed data to MongoDB in batches.
84
88
85
- :param url : MongoDB url; most commonly `mongodb://username:password@host:port`
89
+ :param host : MongoDB hostname; example "localhost"
86
90
:param db: MongoDB database name
87
91
:param collection: MongoDB collection name
92
+ :param username: username, if authentication is required
93
+ :param password: password, if authentication is required
94
+ :param port: port used by MongoDB host if not using the default of 27017
88
95
:param document_matcher: How documents are selected to update.
89
96
A callable that accepts a `BatchItem` and returns a MongoDB "query filter".
90
97
If no match, will insert if `upsert=True`, where `_id` will be either the
@@ -107,9 +114,13 @@ def __init__(
107
114
NOTE: metadata is added before this step, so don't accidentally
108
115
exclude it here!
109
116
"""
110
-
111
117
super ().__init__ ()
112
- self ._url = url
118
+ auth_stub = f"{ qp (username )} :{ qp (password )} @" if username else ""
119
+ self ._client_kwargs = {
120
+ "host" : f"mongodb://{ auth_stub } { host } " ,
121
+ "port" : port ,
122
+ ** kwargs ,
123
+ }
113
124
self ._db_name = db
114
125
self ._collection_name = collection
115
126
self ._document_matcher = document_matcher
@@ -119,13 +130,11 @@ def __init__(
119
130
self ._add_topic_metadata = add_topic_metadata
120
131
self ._value_selector = value_selector
121
132
self ._auth_timeout_ms = authentication_timeout_ms
122
- self ._client_kwargs = kwargs
123
133
self ._client : Optional [MongoClient ] = None
124
134
self ._collection : Optional [Collection ] = None
125
135
126
136
def setup (self ):
127
137
self ._client = MongoClient (
128
- self ._url ,
129
138
serverSelectionTimeoutMS = self ._auth_timeout_ms ,
130
139
** self ._client_kwargs ,
131
140
)
0 commit comments