HashedDict

自定义字典

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
	
class HashedDict(object):
'''
自定义hashtable
'''
def __init__(self, size=10):
self.hash_list = [list() for _ in range(size)]
self.size = size
self.length = 0

def __setitem__(self, key, value):
hash_key = hash(key) % self.size
sub_list = self.hash_list[hash_key]
if sub_list:
# hash冲突时使用list向后追加元素
matched=False
for item in sub_list:
if item[0] == key:
item[1] = value
matched=True

if not matched:
sub_list.append([key, value])
self.length += 1
else:
sub_list.append([key, value])
self.length += 1

def __getitem__(self, key):
sub_list = self.hash_list[hash(key) % self.size]
if sub_list:
for item in sub_list:
if item[0] == key:
return item[1]
raise KeyError(key)

def __contains__(self, key):
sub_list = self.hash_list[hash(key) % self.size]
for item in sub_list:
if item[0] == key:
return True
return False

def __repr__(self):
result = []
for sub_list in self.hash_list:
for item in sub_list:
result.append(str(item[0] + ":" + str(item[1])))

return ",".join(result)

def items(self):
for sub_list in self.hash_list:
if not sub_list:
continue
for innitem in sub_list:
yield innitem

def values(self):
for sub_list in self.hash_list:
if not sub_list:
continue
for innitem in sub_list:
yield innitem[1]

def __len__(self):
return self.length

def __str__(self):
return self.__repr__()

test
1
2
3
4
5
6
7
8
9
10
11
12
	
def test_it():
print('test begin')
dd = HashedDict()
dd['a'] = 1
dd['b'] = 2
print(dd)
print(len(dd))
print(repr(dd))
print(list(dd.items()))
print(list(dd.values()))
print('test end')

py.test 测试,mock

pytest使用方法

1
2
3
4
5
6
7
8

# 指定测试文件
py.test -v test_functions.py


# 指定测试目录
py.test -v .

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

'''
# 目录结构
test
.
├── __init__.py
├── functions.py # 方法文件
├── test_app.py # 测试文件
└── test_functions.py # 测试文件
'''


# functions.py

def multiple(x,y ):
return x*y +10


def add_and_multiply(x,y ):
addition=x+y
multi=multiple(x,y)

return (addition, multi)

# ------------------------

# test_functions.py

def test_add_and_multiply():
x,y=3,5

addi, multi=add_and_multiply(x,y )

assert addi==8
assert multi==15



# 要mock的方法要写全路径
@mock.patch('test.functions.multiple', return_value=15 )
def test_add_and_multiply_mock(mock_func):
x = 3
y = 5
# 要mock的方法需指定绝对路径
# test.functions.multiple=mock.Mock(return_value=15)
addi, multi= add_and_multiply(x, y)
print()
assert addi == 8
assert multi == 15



def test_add_and_multiply_mock2():
x = 3
y = 5
# 要mock的方法需指定绝对路径
test.functions.multiple=mock.Mock(return_value=15)
addi, multi= add_and_multiply(x, y)
print()
assert addi == 8
assert multi == 15


测试

'''bash

指定测试文件

py.test -v test_functions.py

指定测试目录

py.test -v .

py.test -v test_functions.py
collected 3 items

test_functions.py::test_add_and_multiply FAILED [ 33%]
test_functions.py::test_add_and_multiply_mock PASSED [ 66%]
test_functions.py::test_add_and_multiply_mock2 PASSED [100%]

=============== FAILURES ============
___________test_add_and_multiply _____

def test_add_and_multiply():
    x,y=3,5

    addi, multi=add_and_multiply(x,y )

    assert addi==8
  assert multi==15

E assert 25 == 15
E -25
E +15

test_functions.py:21: AssertionError
=============== 1 failed, 2 passed in 0.14s ===========

1

celery分布式异步任务队列

celery是一个基于分布消息传递的异步任务队列.它一定需要建立在一个分布的消息传递机制上,这个消息传递机制就是celery文档里常说的broker。

celery隐藏了rabbitmq接口的实现细节,既充当了publisher(client)又充当了consumer (worker)的角色。

'''思考一下,如果我们用rabbitmq自己实现任务队列,有一天我们不想用rabbit了怎么办?我们换个思维,如果没有celery,让你自己设计一个异步任务队列你怎么做。首先,要有一个发起任务的client,选定一定保存任务信息的媒介,由一个worker去一直监听这个信息媒介,这个worker最好是多进程的,另外可以兼容尽可能多得信息媒介。好吧,这个不就是celery所做的事儿么,celery兼容多个broker,既是任务发起者又是执行者,另外支持多进程…还有好多通用功能考虑。
'''

假设项目的目录结构是:

1
2
task--
--celeryapp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 启动消费者
celery -A task.celeryapp worker -l info -c 5

# 消费者异常退出后 ,正常情况下重启后会继续消息已经积压的任务, 看情况要将过期任务清除

# 启动web监控
celery -A task.celeryapp flower -l info --basic_auth=user1:111111
flower -A task.celeryapp --port=8091


# 启动Beat进程,定时任务, 定时将任务发送到broker
celery beat -A task.celeryapp -l info

# 同时启动 消费者和定时任务
celery -B -A task.celeryapp worker -l info

# 也可以在当前目录的上一层来启动 需指定 package.celeryapp


# 后台启动 定时任务和消费者
celery multi restart w1 -B -A task.celeryapp -l info


# celery status -A celery_task
celery -A task.celeryapp inspect stats

# celery.send_task()这个方法解决了producer和consumer的网路拓扑传递数据问题。
celery=Celery()
celery.config_from_object('task.celeryconfig')
celery.send_task('tq.tasks.test', ("hello world",))

# 一般调用方法
from task.tasks import test
print test.delay('param from invoke ')



# 停止正在执行的任务
# 使用方法名调用 可以避免依赖方法的实现代码
...
celery.config_from_object('task.celeryconfig')
from celery.task.control import revoke
revoke('aac00b9c-f701-4a21-bed4-53a8b865a39a', terminate=True)



from celery.result import AsyncResult
res=AsyncResult(t.task_id)
res.state
<!---->

res.revoke(terminate=True)



# 配置信息
json.dumps(celeryapp.control.inspect().conf(), indent=2)

# 已注册的任务列表(得到的列表可以在web页面上做处理, 文件手动触发)
celeryapp.control.inspect().registered_tasks()

# 任务执行情况
celeryapp.control.inspect().active()

# 任务执行情况
celeryapp.control.inspect().stats()



1
2
3
在使用flower时遇到  'stats' inspect method failed , 最终通安装指定版本的kombu 解决 4.5.0
pipenv install kombu==4.5.0

vuejs 列表中的元素,要动态绑定多个样式

用以下的方法 没有成功

1
:class="{ done: item.handled==1 , ignore: item.handled==2, ignore2: item.handled==3 }" >

后来发现下面的方法更稳拓些 , 也方便调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

:class="[gethandle_class(item.handled), getstatus_class(item.status)] ">

methods:{
...

gethandle_class: function (handle_v) {
return {
"1": "done",
"2": "ig",
"3": "ig",
}[handle_v]
},
...

}

Flask 请求处理流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

sockerserver.py.BaseServer.server_forever._handle_request_noblock()
.process_request.finish_request()
.BaseRequestHandle.__init__().handle()
werkzeug.serving.py.WSGIRequestHandler.BaseHTTPRequestHandler.handle().handle_one_request().parse_requests().run_wsgi()
.execute(app)
werkzeug.debug.DebuggedApplication.__call__().Request(environ).response(environ, start_response)
werkzeug.serving.execute(for data in application_iter:).__call__().run_wsgi()
.wsgi_app().request_contex()
flask.ctx.py.push().app_ctx.push(**match_request会将当前请求的地址request.url 用正则解析成路由配置表中的数据及参数 **).session_interface.open_session(app, request)
flask.full_dispatch_request().preprocess_request()
.dispatch_request().view_functions('业务处理')
.finalize_request(rv).make_response()
.process_response(response).session_interface.save_session()

werkzeug.wsgi_app().return response()
.ctx.auto_pop(error)

.shutdown_request(request)