|
1 | 1 | #!/usr/bin/env python
|
2 | 2 |
|
3 | 3 | import pigpio
|
| 4 | +import threading |
4 | 5 |
|
5 | 6 | class RotaryDecoder:
|
6 | 7 |
|
7 |
| - """ Class to decode mechanical rotary encoder pulses """ |
8 |
| - |
9 |
| - def __init__(self, pi, feedback_pin_A, feedback_pin_B, callback): |
10 |
| - |
11 |
| - self._pi = pi |
12 |
| - self._feedback_pin_A = feedback_pin_A # encoder feedback pin A |
13 |
| - self._feedback_pin_B = feedback_pin_B # encoder feedback pin B |
14 |
| - self._callback = callback # callback function on event |
15 |
| - self._direction = 0 # direction, forward = 1, backward = -1, steady = 0 |
16 |
| - |
17 |
| - self._levelA = 0 # value of encoder feedback pin A |
18 |
| - self._levelB = 0 # value of encoder feedback pin B |
19 |
| - |
20 |
| - # setting up GPIO |
21 |
| - self._pi.set_mode(feedback_pin_A, pigpio.INPUT) |
22 |
| - self._pi.set_mode(feedback_pin_B, pigpio.INPUT) |
23 |
| - self._pi.set_pull_up_down(feedback_pin_A, pigpio.PUD_UP) |
24 |
| - self._pi.set_pull_up_down(feedback_pin_B, pigpio.PUD_UP) |
25 |
| - |
26 |
| - # callback function on EITHER_EDGE for each pin |
27 |
| - self._callback_triggerA = self._pi.callback(feedback_pin_A, pigpio.EITHER_EDGE, self._pulse) |
28 |
| - self._callback_triggerB = self._pi.callback(feedback_pin_B, pigpio.EITHER_EDGE, self._pulse) |
29 |
| - |
30 |
| - self._lastGpio = None |
31 |
| - |
32 |
| - """ pulse is the callback function on EITHER_EDGE |
33 |
| - We have two feedback input from pin A and B (two train waves) |
34 |
| - it returns a 1 if the square waves have A leading B because we're moving forward |
35 |
| - It returns a -1 if the square waves have B leading A because we're moving backwards |
36 |
| - In either case, A is staggered from B by (+-)pi/2 radiants |
37 |
| - Note: level = 0 falling edge |
38 |
| - 1 raising edge |
39 |
| - 2 from watchdog |
| 8 | + """ Class to decode mechanical rotary encoder pulses """ |
| 9 | + |
| 10 | + def __init__(self, pi, feedback_pin_A, feedback_pin_B, callback): |
| 11 | + |
| 12 | + self._pi = pi |
| 13 | + self._feedback_pin_A = feedback_pin_A # encoder feedback pin A |
| 14 | + self._feedback_pin_B = feedback_pin_B # encoder feedback pin B |
| 15 | + self._callback = callback # callback function on event |
| 16 | + self._direction = 0 # direction, forward = 1, backward = -1, steady = 0 |
| 17 | + |
| 18 | + self._levelA = 0 # value of encoder feedback pin A |
| 19 | + self._levelB = 0 # value of encoder feedback pin B |
| 20 | + |
| 21 | + self._lock = threading.RLock() |
| 22 | + |
| 23 | + # setting up GPIO |
| 24 | + self._pi.set_mode(feedback_pin_A, pigpio.INPUT) |
| 25 | + self._pi.set_mode(feedback_pin_B, pigpio.INPUT) |
| 26 | + self._pi.set_pull_up_down(feedback_pin_A, pigpio.PUD_UP) |
| 27 | + self._pi.set_pull_up_down(feedback_pin_B, pigpio.PUD_UP) |
| 28 | + |
| 29 | + # callback function on EITHER_EDGE for each pin |
| 30 | + self._callback_triggerA = self._pi.callback(feedback_pin_A, pigpio.EITHER_EDGE, self._pulse) |
| 31 | + self._callback_triggerB = self._pi.callback(feedback_pin_B, pigpio.EITHER_EDGE, self._pulse) |
| 32 | + |
| 33 | + self._lastGpio = None |
| 34 | + |
| 35 | + """ pulse is the callback function on EITHER_EDGE |
| 36 | + We have two feedback input from pin A and B (two train waves) |
| 37 | + it returns a 1 if the square waves have A leading B because we're moving forward |
| 38 | + It returns a -1 if the square waves have B leading A because we're moving backwards |
| 39 | + In either case, A is staggered from B by (+-)pi/2 radiants |
| 40 | + Note: level = 0 falling edge |
| 41 | + 1 raising edge |
| 42 | + 2 from watchdog |
40 | 43 |
|
41 |
| - +---------+ +---------+ 0 |
42 |
| - | | | | |
43 |
| - B | | | | |
44 |
| - | | | | |
45 |
| - +---------+ +---------+ +----- 1 # B leading A |
46 |
| - +---------+ +---------+ 0 # forward |
47 |
| - | | | | |
48 |
| - A | | | | |
49 |
| - | | | | |
50 |
| - ----+ +---------+ +----------+ 1 |
| 44 | + +---------+ +---------+ 0 |
| 45 | + | | | | |
| 46 | + B | | | | |
| 47 | + | | | | |
| 48 | + +---------+ +---------+ +----- 1 # B leading A |
| 49 | + +---------+ +---------+ 0 # forward |
| 50 | + | | | | |
| 51 | + A | | | | |
| 52 | + | | | | |
| 53 | + ----+ +---------+ +----------+ 1 |
51 | 54 |
|
52 | 55 |
|
53 |
| - +---------+ +---------+ 0 |
54 |
| - | | | | |
55 |
| - A | | | | |
56 |
| - | | | | |
57 |
| - ----+ +---------+ +----------+ 1 # A leading B |
58 |
| - +---------+ +---------+ 0 # backward |
59 |
| - | | | | |
60 |
| - B | | | | |
61 |
| - | | | | |
62 |
| - +---------+ +---------+ +----- 1 |
63 |
| - """ |
64 |
| - def _pulse(self, gpio, level, tick): |
65 |
| - # interrupt comes from pin A |
66 |
| - if (gpio == self._feedback_pin_A): |
67 |
| - self._levelA = level # set level of squared wave (0, 1) on A |
68 |
| - # interrupt comes from pin B |
69 |
| - else: |
70 |
| - self._levelB = level # set level of squared wave (0, 1) on B |
71 |
| - |
72 |
| - if (gpio != self._lastGpio): # debounce |
73 |
| - self._lastGpio = gpio |
74 |
| - |
75 |
| - # backward (A leading B) |
76 |
| - if (gpio == self._feedback_pin_A and level == 1): |
77 |
| - if (self._levelB == 0): |
78 |
| - self._callback(-1) # A leading B, moving backward |
79 |
| - self._direction = -1 # backward |
80 |
| - elif (gpio == self._feedback_pin_A and level == 0): |
81 |
| - if (self._levelB == 1): |
82 |
| - self._callback(-1) # A leading B, moving backward |
83 |
| - self._direction = -1 # backward |
84 |
| - |
85 |
| - # forward (B leading A) |
86 |
| - elif (gpio == self._feedback_pin_B and level == 1): |
87 |
| - if (self._levelA == 0): |
88 |
| - self._callback(1) # B leading A, moving forward |
89 |
| - self._direction = 1 # forward |
90 |
| - elif (gpio == self._feedback_pin_B and level == 0): |
91 |
| - if (self._levelA == 1): |
92 |
| - self._callback(1) # A leading B, moving forward |
93 |
| - self._direction = 1 # forward |
94 |
| - |
95 |
| - def cancel(self): |
96 |
| - |
97 |
| - """ |
98 |
| - Cancel the rotary encoder decoder callbacks. |
99 |
| - """ |
100 |
| - self._callback_triggerA.cancel() |
101 |
| - self._callback_triggerB.cancel() |
| 56 | + +---------+ +---------+ 0 |
| 57 | + | | | | |
| 58 | + A | | | | |
| 59 | + | | | | |
| 60 | + ----+ +---------+ +----------+ 1 # A leading B |
| 61 | + +---------+ +---------+ 0 # backward |
| 62 | + | | | | |
| 63 | + B | | | | |
| 64 | + | | | | |
| 65 | + +---------+ +---------+ +----- 1 |
| 66 | + """ |
| 67 | + def _pulse(self, gpio, level, tick): |
| 68 | + #self._lock.acquire() |
| 69 | + # interrupt comes from pin A |
| 70 | + if (gpio == self._feedback_pin_A): |
| 71 | + self._levelA = level # set level of squared wave (0, 1) on A |
| 72 | + # interrupt comes from pin B |
| 73 | + else: |
| 74 | + self._levelB = level # set level of squared wave (0, 1) on B |
| 75 | + |
| 76 | + if (gpio != self._lastGpio): # debounce |
| 77 | + self._lastGpio = gpio |
| 78 | + |
| 79 | + # backward (A leading B) |
| 80 | + if (gpio == self._feedback_pin_A and level == 1): |
| 81 | + if (self._levelB == 0): |
| 82 | + self._callback(-1) # A leading B, moving backward |
| 83 | + self._direction = -1 # backward |
| 84 | + elif (gpio == self._feedback_pin_A and level == 0): |
| 85 | + if (self._levelB == 1): |
| 86 | + self._callback(-1) # A leading B, moving backward |
| 87 | + self._direction = -1 # backward |
| 88 | + |
| 89 | + # forward (B leading A) |
| 90 | + elif (gpio == self._feedback_pin_B and level == 1): |
| 91 | + if (self._levelA == 0): |
| 92 | + self._callback(1) # B leading A, moving forward |
| 93 | + self._direction = 1 # forward |
| 94 | + elif (gpio == self._feedback_pin_B and level == 0): |
| 95 | + if (self._levelA == 1): |
| 96 | + self._callback(1) # A leading B, moving forward |
| 97 | + self._direction = 1 # forward |
| 98 | + #self._lock.release() |
| 99 | + |
| 100 | + def cancel(self): |
| 101 | + |
| 102 | + """ |
| 103 | + Cancel the rotary encoder decoder callbacks. |
| 104 | + """ |
| 105 | + self._callback_triggerA.cancel() |
| 106 | + self._callback_triggerB.cancel() |
102 | 107 |
|
103 | 108 |
|
0 commit comments